eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [38/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"
Date Thu, 19 Nov 2015 10:47:45 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
deleted file mode 100644
index e2665bd..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.impl.storm.hdfs;
-
-import com.typesafe.config.Config;
-import eagle.dataproc.impl.storm.AbstractStormSpoutProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.topology.base.BaseRichSpout;
-
-public class HDFSSourcedStormSpoutProvider extends AbstractStormSpoutProvider {
-	private static final Logger LOG = LoggerFactory.getLogger(HDFSSourcedStormSpoutProvider.class);
-	
-	public abstract static class HDFSSpout extends BaseRichSpout{
-		public abstract void copyFiles(); 
-		public void fail(Object msgId) {
-		    int transactionId = (Integer) msgId;
-		    LOG.info(transactionId + " failed");
-		}
-		
-		public void ack(Object msgId) {
-		    int transactionId = (Integer) msgId;
-		    LOG.info(transactionId + " acknowledged");
-		}
-		
-		public static HDFSSpout getHDFSSpout(String conf, Config configContext){
-			if(conf.equalsIgnoreCase("data collection")){
-				return new DataCollectionHDFSSpout(configContext); 
-			}
-			if(conf.equalsIgnoreCase("user profile generation")){
-				return new UserProfileGenerationHDFSSpout(configContext); 
-			}
-			return null;
-		}
-	}
-	
-	@Override
-	public BaseRichSpout getSpout(Config context){
-		LOG.info("GetHDFSSpout called");
-		String typeOperation = context.getString("dataSourceConfig.typeOperation");
-		HDFSSpout spout = HDFSSpout.getHDFSSpout(typeOperation, context);
-		spout.copyFiles();
-		return spout;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java
deleted file mode 100644
index 68a62a6..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.impl.storm.hdfs;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.io.Serializable;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.typesafe.config.Config;
-import eagle.dataproc.core.StreamingProcessConstants;
-import eagle.dataproc.core.ValuesArray;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import eagle.dataproc.impl.storm.hdfs.HDFSSourcedStormSpoutProvider.HDFSSpout;
-import com.esotericsoftware.minlog.Log;
-
-public class UserProfileGenerationHDFSSpout extends HDFSSpout{
-
-	private static final long serialVersionUID = 2274234104008894386L;
-	private Config configContext;
-	private TopologyContext _context; 
-	SpoutOutputCollector _collector;
-	
-	public class UserProfileData implements Serializable{
-		private static final long serialVersionUID = -3315860110144736840L;
-		private String user; 
-		private List<String> dateTime = new ArrayList<String>(); 
-		private List<Integer> hrInDay = new ArrayList<Integer>(); 
-		private List<String> line = new ArrayList<String>();
-		
-		public String getUser() {
-			return user;
-		}
-		public void setUser(String user) {
-			this.user = user;
-		}
-		public String getDateTime(int index) {
-			return dateTime.get(index);
-		}
-		public List<String> getDateTimes() {
-			return dateTime;
-		}
-		public void setDateTime(String dateTime) {
-			this.dateTime.add(dateTime);
-		}
-		public int getHrInDay(int index) {
-			return hrInDay.get(index);
-		}
-		public List<Integer> getHrsInDay() {
-			return hrInDay;
-		}
-		public void setHrInDay(int hrInDay) {
-			this.hrInDay.add(hrInDay);
-		}
-		public String getLine(int index) {
-			return line.get(index);
-		}
-		public List<String> getLines() {
-			return line;
-		}
-		public void setLine(String line) {
-			this.line.add(line);
-		} 
-		
-	}
-	
-	private static final Logger LOG = LoggerFactory.getLogger(UserProfileGenerationHDFSSpout.class);
-	
-	public UserProfileGenerationHDFSSpout(Config configContext){
-		this.configContext = configContext;
-		LOG.info("UserProfileGenerationHDFSSpout called");
-	}
-	
-	public void copyFiles(){
-		LOG.info("Inside listFiles()");
-		//Configuration conf = new Configuration();
-		JobConf conf = new JobConf();
-		// _____________ TO TEST THAT CORRECT HADOOP JARs ARE INCLUDED __________________
-		ClassLoader cl = ClassLoader.getSystemClassLoader();
-        URL[] urls = ((URLClassLoader)cl).getURLs();
-        if(LOG.isDebugEnabled()) {
-			for (URL url : urls) {
-				LOG.debug(url.getFile());
-			}
-		}
-		// _________________________________________
-        String hdfsConnectionStr = configContext.getString("dataSourceConfig.hdfsConnection");
-        LOG.info("HDFS connection string: " + hdfsConnectionStr);
-       
-		String hdfsPath = configContext.getString("dataSourceConfig.hdfsPath");
-		LOG.info("HDFS path: " + hdfsPath);
-		 
-		String copyToPath = configContext.getString("dataSourceConfig.copyToPath");
-		LOG.info("copyToPath: " + copyToPath);
-		String srcPathStr = new String("hdfs://" + hdfsConnectionStr + hdfsPath);
-		Path srcPath = new Path(srcPathStr); 
-		LOG.info("listFiles called");
-		LOG.info("srcPath: " + srcPath);
-		try {
-			FileSystem fs = srcPath.getFileSystem(conf);
-			/*CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); 
-			CompressionCodec codec = codecFactory.getCodec(srcPath);
-			DataInputStream inputStream = new DataInputStream(codec.createInputStream(fs.open(srcPath)));
-			*/
-			
-			Path destPath = new Path(copyToPath);
-			LOG.info("Destination path: " + destPath);
-			String userListFileName = configContext.getString("dataSourceConfig.userList");
-			//loggerHDFSSpout.info("userListFileName: " + userListFileName);
-			List<String> userList = getUser(userListFileName);
-			for(String user:userList){
-				Path finalSrcPath = new Path(srcPath.getName() + "/" + user);
-				fs.copyToLocalFile(finalSrcPath, destPath);
-			}
-			LOG.info("Copy to local succeed");
-			fs.close();
-							
-		} catch (IOException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		}
-		
-	}
-	
-	private List<String> getAllFiles(String root, int level){
-		
-		List<String> lists = new ArrayList<String>();
-		File rootFile = new File(root);
-		File[] tempList = rootFile.listFiles();
-		if(tempList == null)
-			return lists; 
-		
-		for(File temp:tempList){
-			if(temp.isDirectory())
-				lists.addAll(getAllFiles(temp.getAbsolutePath(), ++level));
-			else{
-				if(temp.getName().endsWith(".csv"))
-					lists.add(temp.getAbsolutePath());
-			}
-		}
-		return lists;
-			
-	}
-	
-	public List<String> listFiles(String path){
-		
-		LOG.info("Reading from: " + path);
-		List<String> files = new ArrayList<String>();
-		files = getAllFiles(path, 0); 
-		return files;
-	}
-	
-	private List<String> getUser(String listFileName){
-		List<String> userList = new ArrayList<String>();
-		BufferedReader reader = null; 
-		try{
-			InputStream is = getClass().getResourceAsStream(listFileName);
-			reader = new BufferedReader(new InputStreamReader(is));
-			String line = ""; 
-			while((line = reader.readLine()) != null){
-				userList.add(line);
-				LOG.info("User added:" + line);
-			}
-		}catch(Exception e){
-			e.printStackTrace();
-		}finally{
-			try {
-				if(reader != null)
-					reader.close();
-			} catch (IOException e) {
-				// TODO Auto-generated catch block
-				e.printStackTrace();
-			}
-		}
-		return userList;
-	}
-	
-	@Override
-	public void nextTuple() {
-		LOG.info("Releasing nextTuple");
-		
-		String userListFileName = configContext.getString("dataSourceConfig.userList");
-
-		//loggerHDFSSpout.info("userListFileName: " + userListFileName);
-		List<String> userList = getUser(userListFileName);
-		//loggerHDFSSpout.info("user list size:" + userList.size());
-		for(String user: userList){
-			LOG.info("Processing user: " + user);
-			String copyToPath = configContext.getString("dataSourceConfig.copyToPath");
-			//loggerHDFSSpout.info("copyToPath: " + copyToPath);
-			
-			copyToPath +="/" + user; 
-			List<String> files = listFiles(copyToPath);
-			LOG.info("Files returned: " + files.size());
-			String typeOfFile = configContext.getString("dataSourceConfig.fileFormat");
-			//loggerHDFSSpout.info("typeOfFile returned: " + typeOfFile);
-			UserProfileData usersProfileDataset = new UserProfileData();
-				
-			for(String fileName:files){
-				LOG.info("FileName: " + fileName);
-				usersProfileDataset.setDateTime(fileName.substring(fileName.lastIndexOf("/")+1, fileName.lastIndexOf(".")));
-				BufferedReader br = null; 
-				Reader decoder = null;
-				InputStream inStream = null;
-				
-				try{
-					inStream = new FileInputStream(new File(fileName));
-					decoder = new InputStreamReader(inStream);
-					br = new BufferedReader(decoder);
-					int lineNo = 0; 
-					String line = "";
-					while((line = br.readLine())!= null){
-						boolean containsFileHeader = configContext.getBoolean("dataSourceConfig.containsFileHeader");
-						//loggerHDFSSpout.info("containsFileHeader returned: " + containsFileHeader);
-						if(containsFileHeader == true && lineNo == 0){
-							// ignore the header column
-							lineNo++;
-							continue;
-						}
-			        	//loggerHDFSSpout.info("emitting line from file: " + fileName);
-			        	
-						usersProfileDataset.setLine(line);
-						usersProfileDataset.setHrInDay(lineNo);
-			        	lineNo++;
-					}
-				}
-				catch (Exception e) {
-					Log.error("File operation failed");
-					throw new IllegalStateException();
-				}finally{
-					try {
-						if(br != null)
-							br.close();
-						if(decoder != null)
-							decoder.close();
-						if(inStream != null)
-							inStream.close();
-					} catch (IOException e) {
-						// TODO Auto-generated catch block
-						e.printStackTrace();
-					}
-				}
-			}
-			usersProfileDataset.setUser(user);
-			_collector.emit(new ValuesArray(user, "HDFSSourcedStormExecutor", usersProfileDataset));
-        	LOG.info("Emitting data of length: " + usersProfileDataset.getLines().size());
-			Utils.sleep(1000);
-		}
-		this.close();
-	}
-	
-	@Override
-	public void open(Map arg0, TopologyContext context,
-			SpoutOutputCollector collector) {
-		 _collector = collector;
-		 _context = context;
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		// TODO Auto-generated method stub
-		declarer.declare(new Fields(StreamingProcessConstants.EVENT_PARTITION_KEY, StreamingProcessConstants.EVENT_STREAM_NAME, StreamingProcessConstants.EVENT_ATTRIBUTE_MAP));
-	}
-	
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
deleted file mode 100644
index b424aa2..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.impl.storm.kafka;
-
-import java.util.Arrays;
-
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.BrokerHosts;
-import storm.kafka.KafkaSpout;
-import storm.kafka.SpoutConfig;
-import storm.kafka.ZkHosts;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.topology.base.BaseRichSpout;
-
-import eagle.dataproc.impl.storm.AbstractStormSpoutProvider;
-
-public class KafkaSourcedSpoutProvider extends AbstractStormSpoutProvider{
-    private final static Logger LOG = LoggerFactory.getLogger(KafkaSourcedSpoutProvider.class);
-
-	public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
-		return new SchemeAsMultiScheme(new KafkaSourcedSpoutScheme(deserClsName, context));
-	}
-
-	@Override
-	public BaseRichSpout getSpout(Config context){
-		// Kafka topic
-		String topic = context.getString("dataSourceConfig.topic");
-		// Kafka consumer group id
-		String groupId = context.getString("dataSourceConfig.consumerGroupId");
-		// Kafka fetch size
-		int fetchSize = context.getInt("dataSourceConfig.fetchSize");
-		// Kafka deserializer class
-		String deserClsName = context.getString("dataSourceConfig.deserializerClass");
-		// Kafka broker zk connection
-		String zkConnString = context.getString("dataSourceConfig.zkConnection");
-		// transaction zkRoot
-		String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
-		// Site
-		String site = context.getString("eagleProps.site");
-
-        //String realTopic = (site ==null)? topic : String.format("%s_%s",site,topic);
-
-        LOG.info(String.format("Use topic id: %s",topic));
-		
-		BrokerHosts hosts = new ZkHosts(zkConnString);
-		SpoutConfig spoutConfig = new SpoutConfig(hosts, 
-				topic,
-				zkRoot + "/" + topic,
-				groupId);
-		
-		// transaction zkServers
-		spoutConfig.zkServers = Arrays.asList(context.getString("dataSourceConfig.transactionZKServers").split(","));
-		// transaction zkPort
-		spoutConfig.zkPort = context.getInt("dataSourceConfig.transactionZKPort");
-		// transaction update interval
-		spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS");
-		// Kafka fetch size
-		spoutConfig.fetchSizeBytes = fetchSize;		
-		// "startOffsetTime" is for test usage, prod should not use this
-		if (context.hasPath("dataSourceConfig.startOffsetTime")) {
-			spoutConfig.startOffsetTime = context.getInt("dataSourceConfig.startOffsetTime");
-		}		
-		// "forceFromStart" is for test usage, prod should not use this 
-		if (context.hasPath("dataSourceConfig.forceFromStart")) {
-			spoutConfig.forceFromStart = context.getBoolean("dataSourceConfig.forceFromStart");
-		}
-		
-		spoutConfig.scheme = getStreamScheme(deserClsName, context);
-		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
-		return kafkaSpout;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
deleted file mode 100644
index a3df9f7..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.impl.storm.kafka;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
-
-import java.lang.reflect.Constructor;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.Map;
-
-/**
- * This scheme defines how a kafka message is deserialized and the output field name for storm stream
- * it includes the following:
- * 1. data source is kafka, so need kafka message deserializer class
- * 2. output field declaration
- */
-public class KafkaSourcedSpoutScheme implements Scheme {
-	protected SpoutKafkaMessageDeserializer deserializer;
-	
-	public KafkaSourcedSpoutScheme(String deserClsName, Config context){
-		try{
-			Properties prop = new Properties();
-            if(context.getObject("eagleProps") != null) {
-                prop.putAll(context.getObject("eagleProps"));
-            }
-			Constructor<?> constructor =  Class.forName(deserClsName).getConstructor(Properties.class);
-			deserializer = (SpoutKafkaMessageDeserializer) constructor.newInstance(prop);
-		}catch(Exception ex){
-			throw new RuntimeException("Failed to create new instance for decoder class " + deserClsName, ex);
-		}
-	}
-	
-	@Override
-	public List<Object> deserialize(byte[] ser) {
-		Object tmp = deserializer.deserialize(ser);
-		Map<String, Object> map = (Map<String, Object>)tmp;
-		if(tmp == null)
-			return null;
-		// the following tasks are executed within the same process of kafka spout
-		return Arrays.asList(tmp);
-	}
-	
-	@Override
-	public Fields getOutputFields() {
-//		return new Fields(deserializer.getOutputFields());
-		throw new UnsupportedOperationException("output fields should be declared in sub class of KafkaSourcedSpoutProvider");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java
deleted file mode 100644
index c72d13f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.impl.storm.kafka;
-
-import java.io.Serializable;
-
-public interface SpoutKafkaMessageDeserializer extends Serializable{
-	public Object deserialize(byte[] arg0);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/datastream/JavaMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/datastream/JavaMapper.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/datastream/JavaMapper.java
deleted file mode 100644
index 9de9013..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/datastream/JavaMapper.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package eagle.datastream;
-
-import java.util.List;
-
-public interface JavaMapper {
-    List<Object> map(List<Object> input);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/datastream/JavaMapperStormExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/datastream/JavaMapperStormExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/datastream/JavaMapperStormExecutor.java
deleted file mode 100644
index 393cdd1..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/datastream/JavaMapperStormExecutor.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package eagle.datastream;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-
-public class JavaMapperStormExecutor extends BaseRichBolt{
-    public static class e1 extends JavaMapperStormExecutor {
-        public e1(JavaMapper mapper){
-            super(1, mapper);
-        }
-    }
-    public static class e2 extends JavaMapperStormExecutor {
-        public e2(JavaMapper mapper){
-            super(2, mapper);
-        }
-    }
-    public static class e3 extends JavaMapperStormExecutor {
-        public e3(JavaMapper mapper){
-            super(3, mapper);
-        }
-    }
-    public static class e4 extends JavaMapperStormExecutor {
-        public e4(JavaMapper mapper){
-            super(4, mapper);
-        }
-    }
-
-    private JavaMapper mapper;
-    private OutputCollector collector;
-    private int numOutputFields;
-    public JavaMapperStormExecutor(int numOutputFields, JavaMapper mapper){
-        this.numOutputFields = numOutputFields;
-        this.mapper = mapper;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        List<Object> ret = mapper.map(input.getValues());
-        this.collector.emit(ret);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        List<String> fields = new ArrayList<String>();
-        for(int i=0; i<numOutputFields; i++){
-            fields.add(OutputFieldNameConst.FIELD_PREFIX() + i);
-        }
-        declarer.declare(new Fields(fields));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/datastream/JavaStormExecutorForAlertWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/datastream/JavaStormExecutorForAlertWrapper.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/datastream/JavaStormExecutorForAlertWrapper.java
deleted file mode 100644
index 836029a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/eagle/datastream/JavaStormExecutorForAlertWrapper.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package eagle.datastream;
-
-import com.typesafe.config.Config;
-
-import java.util.List;
-import java.util.SortedMap;
-
-public class JavaStormExecutorForAlertWrapper extends JavaStormStreamExecutor3<String, String, SortedMap<Object, Object>>{
-    private JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> delegate;
-    private String streamName;
-    public JavaStormExecutorForAlertWrapper(JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> delegate, String streamName){
-        this.delegate = delegate;
-        this.streamName = streamName;
-    }
-    @Override
-    public void prepareConfig(Config config) {
-        delegate.prepareConfig(config);
-    }
-
-    @Override
-    public void init() {
-        delegate.init();
-    }
-
-    @Override
-    public void flatMap(List<Object> input, final Collector<Tuple3<String, String, SortedMap<Object, Object>>> collector) {
-        Collector delegateCollector = new Collector(){
-            @Override
-            public void collect(Object o) {
-                Tuple2 tuple2 = (Tuple2)o;
-                collector.collect(new Tuple3(tuple2.f0(), streamName, tuple2.f1()));
-            }
-        };
-        delegate.flatMap(input, delegateCollector);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/AbstractStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/AbstractStormSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/AbstractStormSpoutProvider.java
new file mode 100644
index 0000000..1332887
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/AbstractStormSpoutProvider.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm;
+
+import backtype.storm.topology.base.BaseRichSpout;
+
+import com.typesafe.config.Config;
+
+/**
+ * Normally storm spout is a special part of storm topology and it is implemented in underlying spout implementation
+ * which can be retrieved from getSpout method.
+ */
+public abstract class AbstractStormSpoutProvider{
+	public abstract BaseRichSpout getSpout(Config context);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/DataCollectionHDFSSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/DataCollectionHDFSSpout.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/DataCollectionHDFSSpout.java
new file mode 100644
index 0000000..2b00b92
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/DataCollectionHDFSSpout.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm.hdfs;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.*;
+import java.util.zip.GZIPInputStream;
+
+import com.typesafe.config.Config;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import org.apache.eagle.dataproc.impl.storm.hdfs.HDFSSourcedStormSpoutProvider.HDFSSpout;
+
+public class DataCollectionHDFSSpout extends HDFSSpout{
+
+	private static final long serialVersionUID = 8775646842131298552L;
+	private Config configContext;
+	private TopologyContext _context; 
+	SpoutOutputCollector _collector;
+	private Map<String, Boolean> processFileMap = null; 
+	private static final Logger LOG = LoggerFactory.getLogger(DataCollectionHDFSSpout.class);
+	
+	public DataCollectionHDFSSpout(Config configContext){
+		this.configContext = configContext;
+		processFileMap = new HashMap<String, Boolean>();
+		LOG.info("DataCollectionHDFSSpout called");
+		
+	}
+	
+	public void copyFiles(){
+		LOG.info("Inside listFiles()");
+		Configuration conf = new Configuration(); 
+		// _____________ TO TEST THAT CORRECT HADOOP JARs ARE INCLUDED __________________
+		ClassLoader cl = ClassLoader.getSystemClassLoader();
+        URL[] urls = ((URLClassLoader)cl).getURLs();
+		if(LOG.isDebugEnabled()) {
+			for (URL url : urls) {
+				LOG.debug(url.getFile());
+			}
+		}
+		// _________________________________________
+        String hdfsConnectionStr = configContext.getString("dataSourceConfig.hdfsConnnection");
+        LOG.info("HDFS connection string: " + hdfsConnectionStr);
+       
+		String hdfsPath = configContext.getString("dataSourceConfig.hdfsPath");
+		LOG.info("HDFS path: " + hdfsPath);
+		 
+		String copyToPath = configContext.getString("dataSourceConfig.copyToPath");
+		LOG.info("copyToPath: " + copyToPath);
+		String srcPathStr = new String("hdfs://" + hdfsConnectionStr + hdfsPath);
+		Path srcPath = new Path(srcPathStr); 
+		LOG.info("listFiles called");
+		LOG.info("srcPath: " + srcPath);
+		try {
+			FileSystem fs = srcPath.getFileSystem(conf);
+			/*CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); 
+			CompressionCodec codec = codecFactory.getCodec(srcPath);
+			DataInputStream inputStream = new DataInputStream(codec.createInputStream(fs.open(srcPath)));
+			*/
+			
+			Path destPath = new Path(copyToPath);
+			LOG.info("Destination path: " + destPath);
+			fs.copyToLocalFile(srcPath, destPath);
+			LOG.info("Copy to local succeed");
+			fs.close();
+							
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		
+	}
+	
+	private List<String> getAllFiles(String root, int level){
+		
+		List<String> lists = new ArrayList<String>();
+		File rootFile = new File(root);
+		File[] tempList = rootFile.listFiles();
+		if(tempList == null)
+			return lists; 
+		
+		for(File temp:tempList){
+			if(temp.isDirectory())
+				lists.addAll(getAllFiles(temp.getAbsolutePath(), ++level));
+			else{
+				if(temp.getName().endsWith(".gz") || temp.getName().endsWith(".csv"))
+					lists.add(temp.getAbsolutePath());
+			}
+		}
+		return lists;
+			
+	}
+	
+	public List<String> listFiles(){
+		
+		String copyToPath = configContext.getString("dataSourceConfig.copyToPath");
+		LOG.info("Reading from: " + copyToPath);
+		List<String> files = new ArrayList<String>();
+		files = getAllFiles(copyToPath, 0); 
+		return files;
+	}
+	
+	@Override
+	public void nextTuple() {
+		LOG.info("Releasing nextTuple");
+		List<String> files = listFiles();
+		LOG.info("Files returned: " + files.size());
+		String typeOfFile = configContext.getString("dataSourceConfig.fileFormat");
+		LOG.info("typeOfFile returned: " + typeOfFile);
+		
+		for(String fileName:files){
+			LOG.info("fileName: " + fileName);
+			LOG.info("processFileMap.get(fileName): " + processFileMap.get(fileName));
+			if(processFileMap.get(fileName) == null || processFileMap.get(fileName) == false){
+				processFileMap.put(fileName, true);
+				BufferedReader br = null; 
+				Reader decoder = null;
+				GZIPInputStream in = null; 
+				InputStream inStream = null;
+				
+				try{
+					if(typeOfFile.equalsIgnoreCase("GZIP")){
+						in = new GZIPInputStream(new FileInputStream(new File(fileName)));
+						decoder = new InputStreamReader(in);
+					}else if(typeOfFile.equalsIgnoreCase("CSV")){
+						inStream = new FileInputStream(new File(fileName)); 
+						decoder = new InputStreamReader(inStream);
+					}else{
+						LOG.error("No known file type specified");
+						continue;
+					}
+					
+					br = new BufferedReader(decoder);
+					int lineNo = 0; 
+					String line = "";
+					while((line = br.readLine())!= null){
+						++lineNo;
+			        	//String line = br.readLine();
+			        	//loggerHDFSSpout.info("line number " + lineNo + "is: " + line);
+			        	//if(line == null || line.equalsIgnoreCase(""))
+			        	//	break;
+			        	LOG.info("Emitting line from file: " + fileName);
+			        	//_collector.emit(new ValuesArray(line), lineNo);
+                        _collector.emit(Arrays.asList((Object)line));
+			        	LOG.info("Emitted line no: " + lineNo + " and line: " + line);
+						Utils.sleep(100);
+					}
+				}
+				catch (Exception e) {
+					// TODO: handle exception
+					e.printStackTrace();
+				}finally{
+					try {
+						if(br != null)
+							br.close();
+						if(decoder != null)
+							decoder.close();
+						if(in != null)
+							in.close();
+						if(inStream != null)
+							inStream.close();
+					} catch (IOException e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+					}
+				}
+			}else{
+				LOG.info("Processed the files before, already done! ");
+				//Utils.sleep(10000);
+			}
+			
+		}
+		
+	}
+	
+	public void fail(Object msgId) {
+	    int transactionId = (Integer) msgId;
+	    LOG.info(transactionId + " failed");
+	}
+	
+	public void ack(Object msgId) {
+	    int transactionId = (Integer) msgId;
+	    LOG.info(transactionId + " acknowledged");
+	}
+
+	@Override
+	public void open(Map arg0, TopologyContext context,
+			SpoutOutputCollector collector) {
+		 _collector = collector;
+		 _context = context;
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		// TODO Auto-generated method stub
+		declarer.declare(new Fields("line"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
new file mode 100644
index 0000000..75305f1
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm.hdfs;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.impl.storm.AbstractStormSpoutProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.topology.base.BaseRichSpout;
+
+public class HDFSSourcedStormSpoutProvider extends AbstractStormSpoutProvider {
+	private static final Logger LOG = LoggerFactory.getLogger(HDFSSourcedStormSpoutProvider.class);
+	
+	public abstract static class HDFSSpout extends BaseRichSpout{
+		public abstract void copyFiles(); 
+		public void fail(Object msgId) {
+		    int transactionId = (Integer) msgId;
+		    LOG.info(transactionId + " failed");
+		}
+		
+		public void ack(Object msgId) {
+		    int transactionId = (Integer) msgId;
+		    LOG.info(transactionId + " acknowledged");
+		}
+		
+		public static HDFSSpout getHDFSSpout(String conf, Config configContext){
+			if(conf.equalsIgnoreCase("data collection")){
+				return new DataCollectionHDFSSpout(configContext); 
+			}
+			if(conf.equalsIgnoreCase("user profile generation")){
+				return new UserProfileGenerationHDFSSpout(configContext); 
+			}
+			return null;
+		}
+	}
+	
+	@Override
+	public BaseRichSpout getSpout(Config context){
+		LOG.info("GetHDFSSpout called");
+		String typeOperation = context.getString("dataSourceConfig.typeOperation");
+		HDFSSpout spout = HDFSSpout.getHDFSSpout(typeOperation, context);
+		spout.copyFiles();
+		return spout;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java
new file mode 100644
index 0000000..e07ee81
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm.hdfs;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.Serializable;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.core.StreamingProcessConstants;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import com.esotericsoftware.minlog.Log;
+
+public class UserProfileGenerationHDFSSpout extends HDFSSourcedStormSpoutProvider.HDFSSpout {
+
+	private static final long serialVersionUID = 2274234104008894386L;
+	private Config configContext;
+	private TopologyContext _context; 
+	SpoutOutputCollector _collector;
+	
+	public class UserProfileData implements Serializable{
+		private static final long serialVersionUID = -3315860110144736840L;
+		private String user; 
+		private List<String> dateTime = new ArrayList<String>(); 
+		private List<Integer> hrInDay = new ArrayList<Integer>(); 
+		private List<String> line = new ArrayList<String>();
+		
+		public String getUser() {
+			return user;
+		}
+		public void setUser(String user) {
+			this.user = user;
+		}
+		public String getDateTime(int index) {
+			return dateTime.get(index);
+		}
+		public List<String> getDateTimes() {
+			return dateTime;
+		}
+		public void setDateTime(String dateTime) {
+			this.dateTime.add(dateTime);
+		}
+		public int getHrInDay(int index) {
+			return hrInDay.get(index);
+		}
+		public List<Integer> getHrsInDay() {
+			return hrInDay;
+		}
+		public void setHrInDay(int hrInDay) {
+			this.hrInDay.add(hrInDay);
+		}
+		public String getLine(int index) {
+			return line.get(index);
+		}
+		public List<String> getLines() {
+			return line;
+		}
+		public void setLine(String line) {
+			this.line.add(line);
+		} 
+		
+	}
+	
+	private static final Logger LOG = LoggerFactory.getLogger(UserProfileGenerationHDFSSpout.class);
+	
+	public UserProfileGenerationHDFSSpout(Config configContext){
+		this.configContext = configContext;
+		LOG.info("UserProfileGenerationHDFSSpout called");
+	}
+	
+	public void copyFiles(){
+		LOG.info("Inside listFiles()");
+		//Configuration conf = new Configuration();
+		JobConf conf = new JobConf();
+		// _____________ TO TEST THAT CORRECT HADOOP JARs ARE INCLUDED __________________
+		ClassLoader cl = ClassLoader.getSystemClassLoader();
+        URL[] urls = ((URLClassLoader)cl).getURLs();
+        if(LOG.isDebugEnabled()) {
+			for (URL url : urls) {
+				LOG.debug(url.getFile());
+			}
+		}
+		// _________________________________________
+        String hdfsConnectionStr = configContext.getString("dataSourceConfig.hdfsConnection");
+        LOG.info("HDFS connection string: " + hdfsConnectionStr);
+       
+		String hdfsPath = configContext.getString("dataSourceConfig.hdfsPath");
+		LOG.info("HDFS path: " + hdfsPath);
+		 
+		String copyToPath = configContext.getString("dataSourceConfig.copyToPath");
+		LOG.info("copyToPath: " + copyToPath);
+		String srcPathStr = new String("hdfs://" + hdfsConnectionStr + hdfsPath);
+		Path srcPath = new Path(srcPathStr); 
+		LOG.info("listFiles called");
+		LOG.info("srcPath: " + srcPath);
+		try {
+			FileSystem fs = srcPath.getFileSystem(conf);
+			/*CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); 
+			CompressionCodec codec = codecFactory.getCodec(srcPath);
+			DataInputStream inputStream = new DataInputStream(codec.createInputStream(fs.open(srcPath)));
+			*/
+			
+			Path destPath = new Path(copyToPath);
+			LOG.info("Destination path: " + destPath);
+			String userListFileName = configContext.getString("dataSourceConfig.userList");
+			//loggerHDFSSpout.info("userListFileName: " + userListFileName);
+			List<String> userList = getUser(userListFileName);
+			for(String user:userList){
+				Path finalSrcPath = new Path(srcPath.getName() + "/" + user);
+				fs.copyToLocalFile(finalSrcPath, destPath);
+			}
+			LOG.info("Copy to local succeed");
+			fs.close();
+							
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		
+	}
+	
+	private List<String> getAllFiles(String root, int level){
+		
+		List<String> lists = new ArrayList<String>();
+		File rootFile = new File(root);
+		File[] tempList = rootFile.listFiles();
+		if(tempList == null)
+			return lists; 
+		
+		for(File temp:tempList){
+			if(temp.isDirectory())
+				lists.addAll(getAllFiles(temp.getAbsolutePath(), ++level));
+			else{
+				if(temp.getName().endsWith(".csv"))
+					lists.add(temp.getAbsolutePath());
+			}
+		}
+		return lists;
+			
+	}
+	
+	public List<String> listFiles(String path){
+		
+		LOG.info("Reading from: " + path);
+		List<String> files = new ArrayList<String>();
+		files = getAllFiles(path, 0); 
+		return files;
+	}
+	
+	private List<String> getUser(String listFileName){
+		List<String> userList = new ArrayList<String>();
+		BufferedReader reader = null; 
+		try{
+			InputStream is = getClass().getResourceAsStream(listFileName);
+			reader = new BufferedReader(new InputStreamReader(is));
+			String line = ""; 
+			while((line = reader.readLine()) != null){
+				userList.add(line);
+				LOG.info("User added:" + line);
+			}
+		}catch(Exception e){
+			e.printStackTrace();
+		}finally{
+			try {
+				if(reader != null)
+					reader.close();
+			} catch (IOException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+		}
+		return userList;
+	}
+	
+	@Override
+	public void nextTuple() {
+		LOG.info("Releasing nextTuple");
+		
+		String userListFileName = configContext.getString("dataSourceConfig.userList");
+
+		//loggerHDFSSpout.info("userListFileName: " + userListFileName);
+		List<String> userList = getUser(userListFileName);
+		//loggerHDFSSpout.info("user list size:" + userList.size());
+		for(String user: userList){
+			LOG.info("Processing user: " + user);
+			String copyToPath = configContext.getString("dataSourceConfig.copyToPath");
+			//loggerHDFSSpout.info("copyToPath: " + copyToPath);
+			
+			copyToPath +="/" + user; 
+			List<String> files = listFiles(copyToPath);
+			LOG.info("Files returned: " + files.size());
+			String typeOfFile = configContext.getString("dataSourceConfig.fileFormat");
+			//loggerHDFSSpout.info("typeOfFile returned: " + typeOfFile);
+			UserProfileData usersProfileDataset = new UserProfileData();
+				
+			for(String fileName:files){
+				LOG.info("FileName: " + fileName);
+				usersProfileDataset.setDateTime(fileName.substring(fileName.lastIndexOf("/")+1, fileName.lastIndexOf(".")));
+				BufferedReader br = null; 
+				Reader decoder = null;
+				InputStream inStream = null;
+				
+				try{
+					inStream = new FileInputStream(new File(fileName));
+					decoder = new InputStreamReader(inStream);
+					br = new BufferedReader(decoder);
+					int lineNo = 0; 
+					String line = "";
+					while((line = br.readLine())!= null){
+						boolean containsFileHeader = configContext.getBoolean("dataSourceConfig.containsFileHeader");
+						//loggerHDFSSpout.info("containsFileHeader returned: " + containsFileHeader);
+						if(containsFileHeader == true && lineNo == 0){
+							// ignore the header column
+							lineNo++;
+							continue;
+						}
+			        	//loggerHDFSSpout.info("emitting line from file: " + fileName);
+			        	
+						usersProfileDataset.setLine(line);
+						usersProfileDataset.setHrInDay(lineNo);
+			        	lineNo++;
+					}
+				}
+				catch (Exception e) {
+					Log.error("File operation failed");
+					throw new IllegalStateException();
+				}finally{
+					try {
+						if(br != null)
+							br.close();
+						if(decoder != null)
+							decoder.close();
+						if(inStream != null)
+							inStream.close();
+					} catch (IOException e) {
+						// TODO Auto-generated catch block
+						e.printStackTrace();
+					}
+				}
+			}
+			usersProfileDataset.setUser(user);
+			_collector.emit(new ValuesArray(user, "HDFSSourcedStormExecutor", usersProfileDataset));
+        	LOG.info("Emitting data of length: " + usersProfileDataset.getLines().size());
+			Utils.sleep(1000);
+		}
+		this.close();
+	}
+	
+	@Override
+	public void open(Map arg0, TopologyContext context,
+			SpoutOutputCollector collector) {
+		 _collector = collector;
+		 _context = context;
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		// TODO Auto-generated method stub
+		declarer.declare(new Fields(StreamingProcessConstants.EVENT_PARTITION_KEY, StreamingProcessConstants.EVENT_STREAM_NAME, StreamingProcessConstants.EVENT_ATTRIBUTE_MAP));
+	}
+	
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
new file mode 100644
index 0000000..a7d7079
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm.kafka;
+
+import java.util.Arrays;
+
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.BrokerHosts;
+import storm.kafka.KafkaSpout;
+import storm.kafka.SpoutConfig;
+import storm.kafka.ZkHosts;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.topology.base.BaseRichSpout;
+
+import org.apache.eagle.dataproc.impl.storm.AbstractStormSpoutProvider;
+
+public class KafkaSourcedSpoutProvider extends AbstractStormSpoutProvider{
+    private final static Logger LOG = LoggerFactory.getLogger(KafkaSourcedSpoutProvider.class);
+
+	public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
+		return new SchemeAsMultiScheme(new KafkaSourcedSpoutScheme(deserClsName, context));
+	}
+
+	@Override
+	public BaseRichSpout getSpout(Config context){
+		// Kafka topic
+		String topic = context.getString("dataSourceConfig.topic");
+		// Kafka consumer group id
+		String groupId = context.getString("dataSourceConfig.consumerGroupId");
+		// Kafka fetch size
+		int fetchSize = context.getInt("dataSourceConfig.fetchSize");
+		// Kafka deserializer class
+		String deserClsName = context.getString("dataSourceConfig.deserializerClass");
+		// Kafka broker zk connection
+		String zkConnString = context.getString("dataSourceConfig.zkConnection");
+		// transaction zkRoot
+		String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
+		// Site
+		String site = context.getString("eagleProps.site");
+
+        //String realTopic = (site ==null)? topic : String.format("%s_%s",site,topic);
+
+        LOG.info(String.format("Use topic id: %s",topic));
+		
+		BrokerHosts hosts = new ZkHosts(zkConnString);
+		SpoutConfig spoutConfig = new SpoutConfig(hosts, 
+				topic,
+				zkRoot + "/" + topic,
+				groupId);
+		
+		// transaction zkServers
+		spoutConfig.zkServers = Arrays.asList(context.getString("dataSourceConfig.transactionZKServers").split(","));
+		// transaction zkPort
+		spoutConfig.zkPort = context.getInt("dataSourceConfig.transactionZKPort");
+		// transaction update interval
+		spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS");
+		// Kafka fetch size
+		spoutConfig.fetchSizeBytes = fetchSize;		
+		// "startOffsetTime" is for test usage, prod should not use this
+		if (context.hasPath("dataSourceConfig.startOffsetTime")) {
+			spoutConfig.startOffsetTime = context.getInt("dataSourceConfig.startOffsetTime");
+		}		
+		// "forceFromStart" is for test usage, prod should not use this 
+		if (context.hasPath("dataSourceConfig.forceFromStart")) {
+			spoutConfig.forceFromStart = context.getBoolean("dataSourceConfig.forceFromStart");
+		}
+		
+		spoutConfig.scheme = getStreamScheme(deserClsName, context);
+		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
+		return kafkaSpout;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
new file mode 100644
index 0000000..8bdbcb5
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm.kafka;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+
+import java.lang.reflect.Constructor;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Map;
+
+/**
+ * This scheme defines how a kafka message is deserialized and the output field name for storm stream
+ * it includes the following:
+ * 1. data source is kafka, so need kafka message deserializer class
+ * 2. output field declaration
+ */
+public class KafkaSourcedSpoutScheme implements Scheme {
+	protected SpoutKafkaMessageDeserializer deserializer;
+	
+	public KafkaSourcedSpoutScheme(String deserClsName, Config context){
+		try{
+			Properties prop = new Properties();
+            if(context.getObject("eagleProps") != null) {
+                prop.putAll(context.getObject("eagleProps"));
+            }
+			Constructor<?> constructor =  Class.forName(deserClsName).getConstructor(Properties.class);
+			deserializer = (SpoutKafkaMessageDeserializer) constructor.newInstance(prop);
+		}catch(Exception ex){
+			throw new RuntimeException("Failed to create new instance for decoder class " + deserClsName, ex);
+		}
+	}
+	
+	@Override
+	public List<Object> deserialize(byte[] ser) {
+		Object tmp = deserializer.deserialize(ser);
+		Map<String, Object> map = (Map<String, Object>)tmp;
+		if(tmp == null)
+			return null;
+		// the following tasks are executed within the same process of kafka spout
+		return Arrays.asList(tmp);
+	}
+	
+	@Override
+	public Fields getOutputFields() {
+//		return new Fields(deserializer.getOutputFields());
+		throw new UnsupportedOperationException("output fields should be declared in sub class of KafkaSourcedSpoutProvider");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java
new file mode 100644
index 0000000..76ca458
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm.kafka;
+
+import java.io.Serializable;
+
+public interface SpoutKafkaMessageDeserializer extends Serializable{
+	public Object deserialize(byte[] arg0);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapper.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapper.java
new file mode 100644
index 0000000..7e66478
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapper.java
@@ -0,0 +1,26 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream;
+
+import java.util.List;
+
+public interface JavaMapper {
+    List<Object> map(List<Object> input);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
new file mode 100644
index 0000000..04a80e5
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
@@ -0,0 +1,82 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+
+public class JavaMapperStormExecutor extends BaseRichBolt{
+    public static class e1 extends JavaMapperStormExecutor {
+        public e1(JavaMapper mapper){
+            super(1, mapper);
+        }
+    }
+    public static class e2 extends JavaMapperStormExecutor {
+        public e2(JavaMapper mapper){
+            super(2, mapper);
+        }
+    }
+    public static class e3 extends JavaMapperStormExecutor {
+        public e3(JavaMapper mapper){
+            super(3, mapper);
+        }
+    }
+    public static class e4 extends JavaMapperStormExecutor {
+        public e4(JavaMapper mapper){
+            super(4, mapper);
+        }
+    }
+
+    private JavaMapper mapper;
+    private OutputCollector collector;
+    private int numOutputFields;
+    public JavaMapperStormExecutor(int numOutputFields, JavaMapper mapper){
+        this.numOutputFields = numOutputFields;
+        this.mapper = mapper;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        List<Object> ret = mapper.map(input.getValues());
+        this.collector.emit(ret);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        List<String> fields = new ArrayList<String>();
+        for(int i=0; i<numOutputFields; i++){
+            fields.add(OutputFieldNameConst.FIELD_PREFIX() + i);
+        }
+        declarer.declare(new Fields(fields));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
new file mode 100644
index 0000000..3aacb32
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
@@ -0,0 +1,55 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream;
+
+import com.typesafe.config.Config;
+
+import java.util.List;
+import java.util.SortedMap;
+
+public class JavaStormExecutorForAlertWrapper extends JavaStormStreamExecutor3<String, String, SortedMap<Object, Object>>{
+    private JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> delegate;
+    private String streamName;
+    public JavaStormExecutorForAlertWrapper(JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> delegate, String streamName){
+        this.delegate = delegate;
+        this.streamName = streamName;
+    }
+    @Override
+    public void prepareConfig(Config config) {
+        delegate.prepareConfig(config);
+    }
+
+    @Override
+    public void init() {
+        delegate.init();
+    }
+
+    @Override
+    public void flatMap(List<Object> input, final Collector<Tuple3<String, String, SortedMap<Object, Object>>> collector) {
+        Collector delegateCollector = new Collector(){
+            @Override
+            public void collect(Object o) {
+                Tuple2 tuple2 = (Tuple2)o;
+                collector.collect(new Tuple3(tuple2.f0(), streamName, tuple2.f1()));
+            }
+        };
+        delegate.flatMap(input, delegateCollector);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/AbstractStreamProducerGraph.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/AbstractStreamProducerGraph.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/AbstractStreamProducerGraph.scala
deleted file mode 100644
index 35fdf0a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/AbstractStreamProducerGraph.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package eagle.datastream
-
-trait AbstractStreamProducerGraph {
-  def addEdge(from: StreamProducer, to: StreamProducer, streamConnector: StreamConnector)
-  def addVertex(producer: StreamProducer)
-  def iterator() : Iterator[StreamProducer]
-  def isSource(v : StreamProducer) : Boolean
-  def outgoingEdgesOf(v : StreamProducer) : scala.collection.mutable.Set[StreamConnector]
-  def getNodeByName(name : String) : Option[StreamProducer]
-  def incomingVertexOf(v: StreamProducer) : scala.collection.mutable.Set[StreamProducer]
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/AbstractTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/AbstractTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/AbstractTopologyCompiler.scala
deleted file mode 100644
index 7d59aae..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/AbstractTopologyCompiler.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-trait AbstractTopologyCompiler{
-  def buildTopology : AbstractTopologyExecutor
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/AbstractTopologyExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/AbstractTopologyExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/AbstractTopologyExecutor.scala
deleted file mode 100644
index 869a25f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/AbstractTopologyExecutor.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-trait AbstractTopologyExecutor {
-  def execute
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/AlertExecutorConsumerUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/AlertExecutorConsumerUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/AlertExecutorConsumerUtils.scala
deleted file mode 100644
index b335f7f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/AlertExecutorConsumerUtils.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package eagle.datastream
-
-import java.util
-
-import eagle.alert.dedup.{AlertEmailDeduplicationExecutor, AlertEntityDeduplicationExecutor}
-import eagle.alert.notification.AlertNotificationExecutor
-import eagle.alert.persist.AlertPersistExecutor
-import eagle.executor.AlertExecutor
-import org.slf4j.{LoggerFactory, Logger}
-
-import scala.collection.mutable.ListBuffer
-
-/**
- * Create alert executors and provide callback for programmer to link alert executor to immediate parent executors
- *
- * <br/><br/>
- * Explanations for programId, alertExecutorId and policy<br/><br/>
- * - programId - distributed or single-process program for example one storm topology<br/>
- * - alertExecutorId - one process/thread which executes multiple policies<br/>
- * - policy - some rules to be evaluated<br/>
- *
- * <br/>
- *
- * Normally the mapping is like following:
- * <pre>
- * programId (1:N) alertExecutorId
- * alertExecutorId (1:N) policy
- * </pre>
- */
-
-object AlertExecutorConsumerUtils {
-  private val LOG: Logger = LoggerFactory.getLogger(AlertExecutorConsumerUtils.getClass)
-
-  def setupAlertConsumers(toBeAddedEdges: ListBuffer[StreamConnector], alertStreamProducers: List[StreamProducer]): Unit = {
-    var alertExecutorIdList : java.util.List[String] = new util.ArrayList[String]()
-    alertStreamProducers.map(x =>
-      alertExecutorIdList.add(x.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getAlertExecutorId));
-    val alertDefDao = alertStreamProducers.head.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getAlertDefinitionDao
-    val entityDedupExecutor: AlertEntityDeduplicationExecutor = new AlertEntityDeduplicationExecutor(alertExecutorIdList, alertDefDao)
-    val emailDedupExecutor: AlertEmailDeduplicationExecutor = new AlertEmailDeduplicationExecutor(alertExecutorIdList, alertDefDao)
-    val notificationExecutor: AlertNotificationExecutor = new AlertNotificationExecutor(alertExecutorIdList, alertDefDao)
-    val persistExecutor: AlertPersistExecutor = new AlertPersistExecutor
-
-    val entityDedupStreamProducer = FlatMapProducer(UniqueId.incrementAndGetId(),entityDedupExecutor)
-    val persistStreamProducer = FlatMapProducer(UniqueId.incrementAndGetId(),persistExecutor)
-    val emailDedupStreamProducer = FlatMapProducer(UniqueId.incrementAndGetId(),emailDedupExecutor)
-    val notificationStreamProducer = FlatMapProducer(UniqueId.incrementAndGetId(),notificationExecutor)
-    toBeAddedEdges += StreamConnector(entityDedupStreamProducer, persistStreamProducer)
-    toBeAddedEdges += StreamConnector(emailDedupStreamProducer, notificationStreamProducer)
-
-    alertStreamProducers.foreach(sp => {
-      toBeAddedEdges += StreamConnector(sp, entityDedupStreamProducer)
-      toBeAddedEdges += StreamConnector(sp, emailDedupStreamProducer)
-    })
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/ExecutionEnvironment.scala
deleted file mode 100644
index 832cd4b..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/eagle/datastream/ExecutionEnvironment.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-import backtype.storm.topology.base.BaseRichSpout
-import com.typesafe.config.{Config, ConfigFactory}
-import eagle.dataproc.impl.storm.AbstractStormSpoutProvider
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
-import org.slf4j.LoggerFactory
-
-object ExecutionEnvironmentFactory{
-
-  def getStorm(config : Config) = new StormExecutionEnvironment(config)
-  def getStorm:StormExecutionEnvironment = {
-    val config = ConfigFactory.load()
-    getStorm(config)
-  }
-}
-
-abstract class ExecutionEnvironment(config : Config){
-  def execute()
-}
-
-class StormExecutionEnvironment(config: Config) extends ExecutionEnvironment(config){
-  val LOG = LoggerFactory.getLogger(classOf[StormExecutionEnvironment])
-  val dag = new DirectedAcyclicGraph[StreamProducer, StreamConnector](classOf[StreamConnector])
-
-  override def execute() : Unit = {
-    LOG.info("initial graph:\n")
-    GraphPrinter.print(dag)
-    new StreamAlertExpansion(config).expand(dag)
-    LOG.info("after StreamAlertExpansion graph:")
-    GraphPrinter.print(dag)
-    new StreamUnionExpansion(config).expand(dag)
-    LOG.info("after StreamUnionExpansion graph:")
-    GraphPrinter.print(dag)
-    new StreamGroupbyExpansion(config).expand(dag)
-    LOG.info("after StreamGroupbyExpansion graph:")
-    GraphPrinter.print(dag)
-    new StreamNameExpansion(config).expand(dag)
-    LOG.info("after StreamNameExpansion graph:")
-    GraphPrinter.print(dag)
-    new StreamParallelismConfigExpansion(config).expand(dag)
-    LOG.info("after StreamParallelismConfigExpansion graph:")
-    GraphPrinter.print(dag)
-    val stormDag = StormStreamDAGTransformer.transform(dag)
-    StormTopologyCompiler(config, stormDag).buildTopology.execute
-  }
-
-  def newSource(source: BaseRichSpout): StormSourceProducer ={
-    val ret = StormSourceProducer(UniqueId.incrementAndGetId(), source)
-    ret.config = config
-    ret.graph = dag
-    dag.addVertex(ret)
-    ret
-  }
-
-  def newSource(sourceProvider: AbstractStormSpoutProvider):StormSourceProducer = newSource(sourceProvider.getSpout(config))
-}
\ No newline at end of file


Mime
View raw message