eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [37/51] [partial] incubator-eagle git commit: EAGLE-184 Migrate eagle website from https://github.com/eaglemonitoring/eaglemonitoring.github.io to document branch
Date Thu, 03 Mar 2016 18:10:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
deleted file mode 100644
index e0dadbf..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.policy.PolicyEvaluator;
-import org.apache.eagle.policy.PolicyManager;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
-import org.apache.eagle.policy.executor.IPolicyExecutor;
-import org.apache.eagle.policy.siddhi.SiddhiEvaluationHandler;
-import org.apache.hadoop.hbase.util.MD5Hash;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Only one policy for one simple aggregate executor
- *
- * Created on 1/10/16.
- */
-public class SimpleAggregateExecutor
-        extends JavaStormStreamExecutor2<String, AggregateEntity>
-        implements SiddhiEvaluationHandler<AggregateDefinitionAPIEntity, AggregateEntity>, IPolicyExecutor<AggregateDefinitionAPIEntity, AggregateEntity> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(SimpleAggregateExecutor.class);
-
-    private final String cql;
-    private final int partitionSeq;
-    private final int totalPartitionNum;
-
-    private final String[] upStreamNames;
-    private String policyId;
-    private String executorId;
-    private Config config;
-    private AggregateDefinitionAPIEntity aggDef;
-    private PolicyEvaluator<AggregateDefinitionAPIEntity> evaluator;
-
-    public SimpleAggregateExecutor(String[] upStreams, String cql, String policyType, int partitionSeq, int totalPartitionNum) {
-        this.cql = cql;
-        this.partitionSeq = partitionSeq;
-        this.upStreamNames = upStreams;
-        this.totalPartitionNum = totalPartitionNum;
-        // create an fixed definition policy api entity, and indicate it has full definition
-        aggDef = new AggregateDefinitionAPIEntity();
-        aggDef.setTags(new HashMap<String, String>());
-        aggDef.getTags().put(Constants.POLICY_TYPE, policyType);
-        // TODO make it more general, not only hard code siddhi cep support here.
-        try {
-            Map<String,Object> template = new HashMap<>();
-            template.put("type","siddhiCEPEngine");
-            template.put("expression",this.cql);
-            template.put("containsDefinition",true);
-            aggDef.setPolicyDef(new ObjectMapper().writer().writeValueAsString(template));
-        } catch (Exception e) {
-            LOG.error("Simple aggregate generate policy definition failed!", e);
-        }
-        aggDef.setCreatedTime(new Date().getTime());
-        aggDef.setLastModifiedDate(new Date().getTime());
-        aggDef.setName("anonymous-aggregation-def");
-        aggDef.setOwner("anonymous");
-        aggDef.setEnabled(true);
-        aggDef.setDescription("anonymous aggregation definition");
-
-        String random = MD5Hash.getMD5AsHex(cql.getBytes());
-        policyId = "anonymousAggregatePolicyId-" + random;
-        executorId= "anonymousAggregateId-" +random;
-    }
-
-    @Override
-    public void prepareConfig(Config config) {
-        this.config = config;
-    }
-
-    @Override
-    public void init() {
-        evaluator = createPolicyEvaluator(aggDef);
-    }
-
-    /**
-     * Create PolicyEvaluator instance according to policyType-mapped policy evaluator class
-     *
-     * @return PolicyEvaluator instance
-     */
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    protected PolicyEvaluator<AggregateDefinitionAPIEntity> createPolicyEvaluator(AggregateDefinitionAPIEntity alertDef) {
-        String policyType = alertDef.getTags().get(Constants.POLICY_TYPE);
-        Class<? extends PolicyEvaluator> evalCls = PolicyManager.getInstance().getPolicyEvaluator(policyType);
-        if (evalCls == null) {
-            String msg = "No policy evaluator defined for policy type : " + policyType;
-            LOG.error(msg);
-            throw new IllegalStateException(msg);
-        }
-
-        AbstractPolicyDefinition policyDef = null;
-        try {
-            policyDef = JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), AbstractPolicyDefinition.class,
-                    PolicyManager.getInstance().getPolicyModules(policyType));
-        } catch (Exception ex) {
-            LOG.error("Fail initial alert policy def: " + alertDef.getPolicyDef(), ex);
-        }
-
-        PolicyEvaluator<AggregateDefinitionAPIEntity> pe;
-        PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> context = new PolicyEvaluationContext<>();
-        context.policyId = alertDef.getTags().get("policyId");
-        context.alertExecutor = this;
-        context.resultRender = new AggregateResultRender();
-        try {
-            // create evaluator instances
-            pe = (PolicyEvaluator<AggregateDefinitionAPIEntity>) evalCls
-                    .getConstructor(Config.class, PolicyEvaluationContext.class, AbstractPolicyDefinition.class, String[].class, boolean.class)
-                    .newInstance(config, context, policyDef, upStreamNames, false);
-        } catch (Exception ex) {
-            LOG.error("Fail creating new policyEvaluator", ex);
-            LOG.warn("Broken policy definition and stop running : " + alertDef.getPolicyDef());
-            throw new IllegalStateException(ex);
-        }
-        return pe;
-    }
-
-    @Override
-    public void flatMap(List<Object> input, Collector<Tuple2<String, AggregateEntity>> collector) {
-        if (input.size() != 3)
-            throw new IllegalStateException("AggregateExecutor always consumes exactly 3 fields: key, stream name and value(SortedMap)");
-        if (LOG.isDebugEnabled()) LOG.debug("Msg is coming " + input.get(2));
-        if (LOG.isDebugEnabled()) LOG.debug("Current policyEvaluators: " + evaluator);
-
-        try {
-            evaluator.evaluate(new ValuesArray(collector, input.get(1), input.get(2)));
-        } catch (Exception ex) {
-            LOG.error("Got an exception, but continue to run " + input.get(2).toString(), ex);
-        }
-    }
-
-    @Override
-    public void onEvalEvents(PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> context, List<AggregateEntity> alerts) {
-        if (alerts != null && !alerts.isEmpty()) {
-            String policyId = context.policyId;
-            LOG.info(String.format("Detected %d alerts for policy %s", alerts.size(), policyId));
-            Collector outputCollector = context.outputCollector;
-            PolicyEvaluator<AggregateDefinitionAPIEntity> evaluator = context.evaluator;
-            for (AggregateEntity entity : alerts) {
-                synchronized (this) {
-                    outputCollector.collect(new Tuple2(policyId, entity));
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("A new alert is triggered: " + executorId + ", partition " + partitionSeq + ", Got an alert with output context: " + entity + ", for policy " + evaluator);
-                }
-            }
-        }
-    }
-
-    @Override
-    public String getExecutorId() {
-        return executorId;
-    }
-
-    @Override
-    public int getPartitionSeq() {
-        return partitionSeq;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java
deleted file mode 100644
index 62830ae..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate.entity;
-
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.apache.eagle.policy.common.Constants;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-/**
- * entity of stream analyze definition
- *
- */
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("aggregatedef")
-@ColumnFamily("f")
-@Prefix("aggregatedef")
-@Service(Constants.AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"site", "dataSource", "executorId", "policyId", "policyType"})
-@Indexes({
-	@Index(name="Index_1_aggregateExecutorId", columns = { "executorId" }, unique = true),
-})
-@SuppressWarnings("serial")
-public class AggregateDefinitionAPIEntity extends AbstractPolicyDefinitionEntity {
-
-	@Column("a")
-	private String name;
-	@Column("b")
-	private String policyDef;
-	@Column("c")
-	private String description;
-	@Column("d")
-	private boolean enabled;
-	@Column("e")
-	private String owner;
-	@Column("f")
-	private long lastModifiedDate;
-	@Column("g")
-	private long createdTime;
-
-	public String getName() {
-		return name;
-	}
-
-	public void setName(String name) {
-		this.name = name;
-	}
-
-	public String getPolicyDef() {
-		return policyDef;
-	}
-
-	public void setPolicyDef(String policyDef) {
-		this.policyDef = policyDef;
-		valueChanged("policyDef");
-	}
-
-	public String getDescription() {
-		return description;
-	}
-
-	public void setDescription(String description) {
-		this.description = description;
-		valueChanged("description");
-	}
-
-	public boolean isEnabled() {
-		return enabled;
-	}
-
-	public void setEnabled(boolean enabled) {
-		this.enabled = enabled;
-		valueChanged("enabled");
-	}
-
-	public String getOwner() {
-		return owner;
-	}
-
-	public void setOwner(String owner) {
-		this.owner = owner;
-		valueChanged("owner");
-	}
-
-	public long getLastModifiedDate() {
-		return lastModifiedDate;
-	}
-
-	public void setLastModifiedDate(long lastModifiedDate) {
-		this.lastModifiedDate = lastModifiedDate;
-		valueChanged("lastModifiedDate");
-	}
-
-	public long getCreatedTime() {
-		return createdTime;
-	}
-
-	public void setCreatedTime(long createdTime) {
-		this.createdTime = createdTime;
-		valueChanged("createdTime");
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java
deleted file mode 100644
index 64c20b2..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate.entity;
-
-import java.io.Serializable;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Event entity during stream processing
- * 
- * @since Dec 17, 2015
- *
- */
-public class AggregateEntity implements Serializable {
-
-	private static final long serialVersionUID = 5911351515190098292L;
-
-    private List<Object> data = new LinkedList<>();
-
-    public void add(Object res) {
-        data.add(res);
-    }
-
-    public List<Object> getData() {
-        return data;
-    }
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java
deleted file mode 100644
index 7c932d4..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.aggregate.entity;
-
-import org.apache.eagle.log.entity.repo.EntityRepository;
-
-/**
- * Created on 1/6/16.
- */
-public class AggregateEntityRepository extends EntityRepository {
-    public AggregateEntityRepository() {
-        entitySet.add(AggregateDefinitionAPIEntity.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java
deleted file mode 100644
index 0732639..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.persist;
-
-/**
- * Interface by the stream framework to storage
- * 
- * @since Dec 19, 2015
- *
- */
-public interface IPersistService<T> {
-
-	boolean save(String stream, T apiEntity) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
deleted file mode 100644
index 2e1754b..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.persist;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.eagle.dataproc.impl.persist.druid.KafkaPersistService;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.core.StorageType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.text.MessageFormat;
-import java.util.List;
-
-/**
- *
- * TODO: currently only accept to be used after aggregation node (See the AggregateEntity reference here).
- * @since Dec 19, 2015
- *
- */
-public class PersistExecutor extends JavaStormStreamExecutor2<String, AggregateEntity> {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(PersistExecutor.class);
-
-	private Config config;
-	private IPersistService<AggregateEntity> persistService;
-	private String persistExecutorId;
-	private String persistType;
-
-	public PersistExecutor(String persistExecutorId, String persistType) {
-		this.persistExecutorId = persistExecutorId;
-		this.persistType = persistType;
-	}
-
-    @Override
-	public void prepareConfig(Config config) {
-		this.config = config;
-	}
-
-    @Override
-	public void init() {
-		if (persistType.equalsIgnoreCase(StorageType.KAFKA().toString())) {
-			Config subConfig = this.config.getConfig("persistExecutorConfigs" + "." + persistExecutorId);
-			persistService = new KafkaPersistService(subConfig);
-		} else {
-			throw new RuntimeException(String.format("Persist type '%s' not supported yet!", persistService));
-		}
-	}
-
-	@Override
-	public void flatMap(List<Object> input, Collector<Tuple2<String, AggregateEntity>> collector) {
-		if (input.size() != 2) {
-			LOG.error(String.format("Persist executor expect two elements per tuple. But actually got size %d lists!",
-					input.size()));
-			return;
-		}
-
-		String policyId = (String) input.get(0);
-		AggregateEntity entity = (AggregateEntity) input.get(1);
-		try {
-			persistService.save("defaultOutput", entity);
-		} catch (Exception e) {
-			LOG.error(MessageFormat.format("persist entity failed: {0}", entity), e);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java
deleted file mode 100644
index ea61278..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.persist.druid;
-
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * TODO: configurable null handling for serialization??
- * Created on 1/4/16.
- */
-public class AggregateEntitySerializer implements
-        Closeable, AutoCloseable, Serializer<AggregateEntity> {
-
-    private final StringSerializer stringSerializer = new StringSerializer();
-    private static final Logger logger = LoggerFactory.getLogger(AggregateEntitySerializer.class);
-    private static final ObjectMapper om = new ObjectMapper();
-
-    static {
-        om.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS, true);
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-
-    }
-
-    @Override
-    public byte[] serialize(String topic, AggregateEntity data) {
-        String str = null;
-        try {
-            str = om.writeValueAsString(data.getData());
-        } catch (IOException e) {
-            logger.error("Kafka serialization for send error!", e);
-        }
-        return stringSerializer.serialize(topic, str);
-    }
-
-    @Override
-    public void close() {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java
deleted file mode 100644
index 919b92e..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.persist.druid;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
-import org.apache.eagle.dataproc.impl.persist.IPersistService;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-import java.util.*;
-import java.util.concurrent.Future;
-
-/**
- * TODO : support more general entity input
- * @since Dec 21, 2015
- *
- */
-public class KafkaPersistService implements IPersistService<AggregateEntity> {
-
-	private static final String ACKS = "acks";
-	private static final String RETRIES = "retries";
-	private static final String BATCH_SIZE = "batchSize";
-	private static final String LINGER_MS = "lingerMs";
-	private static final String BUFFER_MEMORY = "bufferMemory";
-	private static final String KEY_SERIALIZER = "keySerializer";
-	private static final String VALUE_SERIALIZER = "valueSerializer";
-	private static final String BOOTSTRAP_SERVERS = "bootstrap_servers";
-	
-	private KafkaProducer<String, AggregateEntity> producer;
-	private final Config config;
-	private final SortedMap<String, String> streamTopicMap;
-	private final Properties props;
-	
-	/**
-	 * <pre>
-	 * props.put("bootstrap.servers", "localhost:4242");
-	 * props.put("acks", "all");
-	 * props.put("retries", 0);
-	 * props.put("batch.size", 16384);
-	 * props.put("linger.ms", 1);
-	 * props.put("buffer.memory", 33554432);
-	 * props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-	 * props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-	 * </pre>
-	 */
-	public KafkaPersistService(Config config) {
-		this.config = config;
-		Config kafkaConfig = config.getConfig("kafka");
-		if (kafkaConfig == null) {
-			throw new IllegalStateException("Druid persiste service failed to find kafka configurations!");
-		}
-		props = new Properties();
-		if (kafkaConfig.hasPath(BOOTSTRAP_SERVERS)) {
-			props.put("bootstrap.servers", kafkaConfig.getString(BOOTSTRAP_SERVERS));
-		}
-		if (kafkaConfig.hasPath(ACKS)) {
-			props.put(ACKS, kafkaConfig.getString(ACKS));
-		}
-		if (kafkaConfig.hasPath(RETRIES)) {
-			props.put(RETRIES, kafkaConfig.getInt(RETRIES));
-		}
-		if (kafkaConfig.hasPath(BATCH_SIZE)) {
-			props.put("batch.size", kafkaConfig.getInt(BATCH_SIZE));
-		}
-		if (kafkaConfig.hasPath(LINGER_MS)) {
-			props.put("linger.ms", kafkaConfig.getInt(LINGER_MS));
-		}
-		if (kafkaConfig.hasPath(BUFFER_MEMORY)) {
-			props.put("buffer.memory", kafkaConfig.getLong(BUFFER_MEMORY));
-		}
-		if (kafkaConfig.hasPath(KEY_SERIALIZER)) {
-			props.put("key.serializer", kafkaConfig.getString(KEY_SERIALIZER));
-		} else {
-			props.put("key.serializer", StringSerializer.class.getCanonicalName());
-		}
-//		if (kafkaConfig.hasPath(VALUE_SERIALIZER)) {
-//			props.put("value.serializer", kafkaConfig.getString(VALUE_SERIALIZER));
-//		}
-		props.put("value.serializer", AggregateEntitySerializer.class.getCanonicalName());
-
-		streamTopicMap = new TreeMap<>();
-		if (kafkaConfig.hasPath("topics")) {
-			Config topicConfig = kafkaConfig.getConfig("topics");
-			Set<Map.Entry<String, ConfigValue>> topics = topicConfig.entrySet();
-			for (Map.Entry<String, ConfigValue> t : topics) {
-				streamTopicMap.put(t.getKey(), (String) t.getValue().unwrapped());
-			}
-		}
-
-		producer = new KafkaProducer<>(props);
-	}
-
-	@Override
-	public boolean save(String stream, AggregateEntity apiEntity) throws Exception {
-		if (streamTopicMap.get(stream) != null) {
-			ProducerRecord<String, AggregateEntity> record = new ProducerRecord<>(streamTopicMap.get(stream), apiEntity);
-			Future<RecordMetadata> future = producer.send(record);
-			// TODO : more for check the sending status
-			return true;
-		}
-		return false;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/StormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/StormSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/StormSpoutProvider.java
deleted file mode 100644
index ab90ad7..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/StormSpoutProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm;
-
-import backtype.storm.topology.base.BaseRichSpout;
-
-import com.typesafe.config.Config;
-
-/**
- * Normally storm spout is a special part of storm topology and it is implemented in underlying spout implementation
- * which can be retrieved from getSpout method.
- */
-public interface StormSpoutProvider {
-	public BaseRichSpout getSpout(Config context);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/DataCollectionHDFSSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/DataCollectionHDFSSpout.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/DataCollectionHDFSSpout.java
deleted file mode 100644
index 2b00b92..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/DataCollectionHDFSSpout.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm.hdfs;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.*;
-import java.util.zip.GZIPInputStream;
-
-import com.typesafe.config.Config;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import org.apache.eagle.dataproc.impl.storm.hdfs.HDFSSourcedStormSpoutProvider.HDFSSpout;
-
-public class DataCollectionHDFSSpout extends HDFSSpout{
-
-	private static final long serialVersionUID = 8775646842131298552L;
-	private Config configContext;
-	private TopologyContext _context; 
-	SpoutOutputCollector _collector;
-	private Map<String, Boolean> processFileMap = null; 
-	private static final Logger LOG = LoggerFactory.getLogger(DataCollectionHDFSSpout.class);
-	
-	public DataCollectionHDFSSpout(Config configContext){
-		this.configContext = configContext;
-		processFileMap = new HashMap<String, Boolean>();
-		LOG.info("DataCollectionHDFSSpout called");
-		
-	}
-	
-	public void copyFiles(){
-		LOG.info("Inside listFiles()");
-		Configuration conf = new Configuration(); 
-		// _____________ TO TEST THAT CORRECT HADOOP JARs ARE INCLUDED __________________
-		ClassLoader cl = ClassLoader.getSystemClassLoader();
-        URL[] urls = ((URLClassLoader)cl).getURLs();
-		if(LOG.isDebugEnabled()) {
-			for (URL url : urls) {
-				LOG.debug(url.getFile());
-			}
-		}
-		// _________________________________________
-        String hdfsConnectionStr = configContext.getString("dataSourceConfig.hdfsConnnection");
-        LOG.info("HDFS connection string: " + hdfsConnectionStr);
-       
-		String hdfsPath = configContext.getString("dataSourceConfig.hdfsPath");
-		LOG.info("HDFS path: " + hdfsPath);
-		 
-		String copyToPath = configContext.getString("dataSourceConfig.copyToPath");
-		LOG.info("copyToPath: " + copyToPath);
-		String srcPathStr = new String("hdfs://" + hdfsConnectionStr + hdfsPath);
-		Path srcPath = new Path(srcPathStr); 
-		LOG.info("listFiles called");
-		LOG.info("srcPath: " + srcPath);
-		try {
-			FileSystem fs = srcPath.getFileSystem(conf);
-			/*CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); 
-			CompressionCodec codec = codecFactory.getCodec(srcPath);
-			DataInputStream inputStream = new DataInputStream(codec.createInputStream(fs.open(srcPath)));
-			*/
-			
-			Path destPath = new Path(copyToPath);
-			LOG.info("Destination path: " + destPath);
-			fs.copyToLocalFile(srcPath, destPath);
-			LOG.info("Copy to local succeed");
-			fs.close();
-							
-		} catch (IOException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		}
-		
-	}
-	
-	private List<String> getAllFiles(String root, int level){
-		
-		List<String> lists = new ArrayList<String>();
-		File rootFile = new File(root);
-		File[] tempList = rootFile.listFiles();
-		if(tempList == null)
-			return lists; 
-		
-		for(File temp:tempList){
-			if(temp.isDirectory())
-				lists.addAll(getAllFiles(temp.getAbsolutePath(), ++level));
-			else{
-				if(temp.getName().endsWith(".gz") || temp.getName().endsWith(".csv"))
-					lists.add(temp.getAbsolutePath());
-			}
-		}
-		return lists;
-			
-	}
-	
-	public List<String> listFiles(){
-		
-		String copyToPath = configContext.getString("dataSourceConfig.copyToPath");
-		LOG.info("Reading from: " + copyToPath);
-		List<String> files = new ArrayList<String>();
-		files = getAllFiles(copyToPath, 0); 
-		return files;
-	}
-	
-	@Override
-	public void nextTuple() {
-		LOG.info("Releasing nextTuple");
-		List<String> files = listFiles();
-		LOG.info("Files returned: " + files.size());
-		String typeOfFile = configContext.getString("dataSourceConfig.fileFormat");
-		LOG.info("typeOfFile returned: " + typeOfFile);
-		
-		for(String fileName:files){
-			LOG.info("fileName: " + fileName);
-			LOG.info("processFileMap.get(fileName): " + processFileMap.get(fileName));
-			if(processFileMap.get(fileName) == null || processFileMap.get(fileName) == false){
-				processFileMap.put(fileName, true);
-				BufferedReader br = null; 
-				Reader decoder = null;
-				GZIPInputStream in = null; 
-				InputStream inStream = null;
-				
-				try{
-					if(typeOfFile.equalsIgnoreCase("GZIP")){
-						in = new GZIPInputStream(new FileInputStream(new File(fileName)));
-						decoder = new InputStreamReader(in);
-					}else if(typeOfFile.equalsIgnoreCase("CSV")){
-						inStream = new FileInputStream(new File(fileName)); 
-						decoder = new InputStreamReader(inStream);
-					}else{
-						LOG.error("No known file type specified");
-						continue;
-					}
-					
-					br = new BufferedReader(decoder);
-					int lineNo = 0; 
-					String line = "";
-					while((line = br.readLine())!= null){
-						++lineNo;
-			        	//String line = br.readLine();
-			        	//loggerHDFSSpout.info("line number " + lineNo + "is: " + line);
-			        	//if(line == null || line.equalsIgnoreCase(""))
-			        	//	break;
-			        	LOG.info("Emitting line from file: " + fileName);
-			        	//_collector.emit(new ValuesArray(line), lineNo);
-                        _collector.emit(Arrays.asList((Object)line));
-			        	LOG.info("Emitted line no: " + lineNo + " and line: " + line);
-						Utils.sleep(100);
-					}
-				}
-				catch (Exception e) {
-					// TODO: handle exception
-					e.printStackTrace();
-				}finally{
-					try {
-						if(br != null)
-							br.close();
-						if(decoder != null)
-							decoder.close();
-						if(in != null)
-							in.close();
-						if(inStream != null)
-							inStream.close();
-					} catch (IOException e) {
-						// TODO Auto-generated catch block
-						e.printStackTrace();
-					}
-				}
-			}else{
-				LOG.info("Processed the files before, already done! ");
-				//Utils.sleep(10000);
-			}
-			
-		}
-		
-	}
-	
-	public void fail(Object msgId) {
-	    int transactionId = (Integer) msgId;
-	    LOG.info(transactionId + " failed");
-	}
-	
-	public void ack(Object msgId) {
-	    int transactionId = (Integer) msgId;
-	    LOG.info(transactionId + " acknowledged");
-	}
-
-	@Override
-	public void open(Map arg0, TopologyContext context,
-			SpoutOutputCollector collector) {
-		 _collector = collector;
-		 _context = context;
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		// TODO Auto-generated method stub
-		declarer.declare(new Fields("line"));
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
deleted file mode 100644
index 2323f39..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.dataproc.impl.storm.hdfs;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.topology.base.BaseRichSpout;
-
-public class HDFSSourcedStormSpoutProvider implements StormSpoutProvider {
-	private static final Logger LOG = LoggerFactory.getLogger(HDFSSourcedStormSpoutProvider.class);
-	
-	public abstract static class HDFSSpout extends BaseRichSpout{
-		public abstract void copyFiles(); 
-		public void fail(Object msgId) {
-		    int transactionId = (Integer) msgId;
-		    LOG.info(transactionId + " failed");
-		}
-		
-		public void ack(Object msgId) {
-		    int transactionId = (Integer) msgId;
-		    LOG.info(transactionId + " acknowledged");
-		}
-		
-		public static HDFSSpout getHDFSSpout(String conf, Config configContext){
-			if(conf.equalsIgnoreCase("data collection")){
-				return new DataCollectionHDFSSpout(configContext); 
-			}
-			if(conf.equalsIgnoreCase("user profile generation")){
-				return new UserProfileGenerationHDFSSpout(configContext); 
-			}
-			return null;
-		}
-	}
-	
-	@Override
-	public BaseRichSpout getSpout(Config context){
-		LOG.info("GetHDFSSpout called");
-		String typeOperation = context.getString("dataSourceConfig.typeOperation");
-		HDFSSpout spout = HDFSSpout.getHDFSSpout(typeOperation, context);
-		spout.copyFiles();
-		return spout;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java
deleted file mode 100644
index e07ee81..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm.hdfs;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.io.Serializable;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.core.StreamingProcessConstants;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import com.esotericsoftware.minlog.Log;
-
-public class UserProfileGenerationHDFSSpout extends HDFSSourcedStormSpoutProvider.HDFSSpout {
-
-	private static final long serialVersionUID = 2274234104008894386L;
-	private Config configContext;
-	private TopologyContext _context; 
-	SpoutOutputCollector _collector;
-	
-	public class UserProfileData implements Serializable{
-		private static final long serialVersionUID = -3315860110144736840L;
-		private String user; 
-		private List<String> dateTime = new ArrayList<String>(); 
-		private List<Integer> hrInDay = new ArrayList<Integer>(); 
-		private List<String> line = new ArrayList<String>();
-		
-		public String getUser() {
-			return user;
-		}
-		public void setUser(String user) {
-			this.user = user;
-		}
-		public String getDateTime(int index) {
-			return dateTime.get(index);
-		}
-		public List<String> getDateTimes() {
-			return dateTime;
-		}
-		public void setDateTime(String dateTime) {
-			this.dateTime.add(dateTime);
-		}
-		public int getHrInDay(int index) {
-			return hrInDay.get(index);
-		}
-		public List<Integer> getHrsInDay() {
-			return hrInDay;
-		}
-		public void setHrInDay(int hrInDay) {
-			this.hrInDay.add(hrInDay);
-		}
-		public String getLine(int index) {
-			return line.get(index);
-		}
-		public List<String> getLines() {
-			return line;
-		}
-		public void setLine(String line) {
-			this.line.add(line);
-		} 
-		
-	}
-	
-	private static final Logger LOG = LoggerFactory.getLogger(UserProfileGenerationHDFSSpout.class);
-	
-	public UserProfileGenerationHDFSSpout(Config configContext){
-		this.configContext = configContext;
-		LOG.info("UserProfileGenerationHDFSSpout called");
-	}
-	
-	public void copyFiles(){
-		LOG.info("Inside listFiles()");
-		//Configuration conf = new Configuration();
-		JobConf conf = new JobConf();
-		// _____________ TO TEST THAT CORRECT HADOOP JARs ARE INCLUDED __________________
-		ClassLoader cl = ClassLoader.getSystemClassLoader();
-        URL[] urls = ((URLClassLoader)cl).getURLs();
-        if(LOG.isDebugEnabled()) {
-			for (URL url : urls) {
-				LOG.debug(url.getFile());
-			}
-		}
-		// _________________________________________
-        String hdfsConnectionStr = configContext.getString("dataSourceConfig.hdfsConnection");
-        LOG.info("HDFS connection string: " + hdfsConnectionStr);
-       
-		String hdfsPath = configContext.getString("dataSourceConfig.hdfsPath");
-		LOG.info("HDFS path: " + hdfsPath);
-		 
-		String copyToPath = configContext.getString("dataSourceConfig.copyToPath");
-		LOG.info("copyToPath: " + copyToPath);
-		String srcPathStr = new String("hdfs://" + hdfsConnectionStr + hdfsPath);
-		Path srcPath = new Path(srcPathStr); 
-		LOG.info("listFiles called");
-		LOG.info("srcPath: " + srcPath);
-		try {
-			FileSystem fs = srcPath.getFileSystem(conf);
-			/*CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); 
-			CompressionCodec codec = codecFactory.getCodec(srcPath);
-			DataInputStream inputStream = new DataInputStream(codec.createInputStream(fs.open(srcPath)));
-			*/
-			
-			Path destPath = new Path(copyToPath);
-			LOG.info("Destination path: " + destPath);
-			String userListFileName = configContext.getString("dataSourceConfig.userList");
-			//loggerHDFSSpout.info("userListFileName: " + userListFileName);
-			List<String> userList = getUser(userListFileName);
-			for(String user:userList){
-				Path finalSrcPath = new Path(srcPath.getName() + "/" + user);
-				fs.copyToLocalFile(finalSrcPath, destPath);
-			}
-			LOG.info("Copy to local succeed");
-			fs.close();
-							
-		} catch (IOException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		}
-		
-	}
-	
-	private List<String> getAllFiles(String root, int level){
-		
-		List<String> lists = new ArrayList<String>();
-		File rootFile = new File(root);
-		File[] tempList = rootFile.listFiles();
-		if(tempList == null)
-			return lists; 
-		
-		for(File temp:tempList){
-			if(temp.isDirectory())
-				lists.addAll(getAllFiles(temp.getAbsolutePath(), ++level));
-			else{
-				if(temp.getName().endsWith(".csv"))
-					lists.add(temp.getAbsolutePath());
-			}
-		}
-		return lists;
-			
-	}
-	
-	public List<String> listFiles(String path){
-		
-		LOG.info("Reading from: " + path);
-		List<String> files = new ArrayList<String>();
-		files = getAllFiles(path, 0); 
-		return files;
-	}
-	
-	private List<String> getUser(String listFileName){
-		List<String> userList = new ArrayList<String>();
-		BufferedReader reader = null; 
-		try{
-			InputStream is = getClass().getResourceAsStream(listFileName);
-			reader = new BufferedReader(new InputStreamReader(is));
-			String line = ""; 
-			while((line = reader.readLine()) != null){
-				userList.add(line);
-				LOG.info("User added:" + line);
-			}
-		}catch(Exception e){
-			e.printStackTrace();
-		}finally{
-			try {
-				if(reader != null)
-					reader.close();
-			} catch (IOException e) {
-				// TODO Auto-generated catch block
-				e.printStackTrace();
-			}
-		}
-		return userList;
-	}
-	
-	@Override
-	public void nextTuple() {
-		LOG.info("Releasing nextTuple");
-		
-		String userListFileName = configContext.getString("dataSourceConfig.userList");
-
-		//loggerHDFSSpout.info("userListFileName: " + userListFileName);
-		List<String> userList = getUser(userListFileName);
-		//loggerHDFSSpout.info("user list size:" + userList.size());
-		for(String user: userList){
-			LOG.info("Processing user: " + user);
-			String copyToPath = configContext.getString("dataSourceConfig.copyToPath");
-			//loggerHDFSSpout.info("copyToPath: " + copyToPath);
-			
-			copyToPath +="/" + user; 
-			List<String> files = listFiles(copyToPath);
-			LOG.info("Files returned: " + files.size());
-			String typeOfFile = configContext.getString("dataSourceConfig.fileFormat");
-			//loggerHDFSSpout.info("typeOfFile returned: " + typeOfFile);
-			UserProfileData usersProfileDataset = new UserProfileData();
-				
-			for(String fileName:files){
-				LOG.info("FileName: " + fileName);
-				usersProfileDataset.setDateTime(fileName.substring(fileName.lastIndexOf("/")+1, fileName.lastIndexOf(".")));
-				BufferedReader br = null; 
-				Reader decoder = null;
-				InputStream inStream = null;
-				
-				try{
-					inStream = new FileInputStream(new File(fileName));
-					decoder = new InputStreamReader(inStream);
-					br = new BufferedReader(decoder);
-					int lineNo = 0; 
-					String line = "";
-					while((line = br.readLine())!= null){
-						boolean containsFileHeader = configContext.getBoolean("dataSourceConfig.containsFileHeader");
-						//loggerHDFSSpout.info("containsFileHeader returned: " + containsFileHeader);
-						if(containsFileHeader == true && lineNo == 0){
-							// ignore the header column
-							lineNo++;
-							continue;
-						}
-			        	//loggerHDFSSpout.info("emitting line from file: " + fileName);
-			        	
-						usersProfileDataset.setLine(line);
-						usersProfileDataset.setHrInDay(lineNo);
-			        	lineNo++;
-					}
-				}
-				catch (Exception e) {
-					Log.error("File operation failed");
-					throw new IllegalStateException();
-				}finally{
-					try {
-						if(br != null)
-							br.close();
-						if(decoder != null)
-							decoder.close();
-						if(inStream != null)
-							inStream.close();
-					} catch (IOException e) {
-						// TODO Auto-generated catch block
-						e.printStackTrace();
-					}
-				}
-			}
-			usersProfileDataset.setUser(user);
-			_collector.emit(new ValuesArray(user, "HDFSSourcedStormExecutor", usersProfileDataset));
-        	LOG.info("Emitting data of length: " + usersProfileDataset.getLines().size());
-			Utils.sleep(1000);
-		}
-		this.close();
-	}
-	
-	@Override
-	public void open(Map arg0, TopologyContext context,
-			SpoutOutputCollector collector) {
-		 _collector = collector;
-		 _context = context;
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		// TODO Auto-generated method stub
-		declarer.declare(new Fields(StreamingProcessConstants.EVENT_PARTITION_KEY, StreamingProcessConstants.EVENT_STREAM_NAME, StreamingProcessConstants.EVENT_ATTRIBUTE_MAP));
-	}
-	
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java
deleted file mode 100644
index 416aaa3..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm.kafka;
-
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class JsonSerializer implements Serializer<Object> {
-    private final StringSerializer stringSerializer = new StringSerializer();
-    private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);
-    private static final ObjectMapper om = new ObjectMapper();
-
-    static {
-        om.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS, true);
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        stringSerializer.configure(configs,isKey);
-    }
-
-    @Override
-    public byte[] serialize(String topic, Object data) {
-        String str = null;
-        try {
-            str = om.writeValueAsString(data);
-        } catch (IOException e) {
-            logger.error("Kafka serialization for send error!", e);
-        }
-        return stringSerializer.serialize(topic, str);
-    }
-
-    @Override
-    public void close() {
-        stringSerializer.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
deleted file mode 100644
index 0107a9b..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm.kafka;
-
-import java.util.Arrays;
-
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.BrokerHosts;
-import storm.kafka.KafkaSpout;
-import storm.kafka.SpoutConfig;
-import storm.kafka.ZkHosts;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.topology.base.BaseRichSpout;
-
-import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
-
-public class KafkaSourcedSpoutProvider implements StormSpoutProvider {
-    private final static Logger LOG = LoggerFactory.getLogger(KafkaSourcedSpoutProvider.class);
-
-	public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
-		return new SchemeAsMultiScheme(new KafkaSourcedSpoutScheme(deserClsName, context));
-	}
-
-    private String configPrefix = "dataSourceConfig";
-
-    public KafkaSourcedSpoutProvider(){}
-
-    public KafkaSourcedSpoutProvider(String prefix){
-        this.configPrefix = prefix;
-    }
-
-	@Override
-	public BaseRichSpout getSpout(Config config){
-        Config context = config;
-        if(this.configPrefix!=null) context = config.getConfig(configPrefix);
-		// Kafka topic
-		String topic = context.getString("topic");
-		// Kafka consumer group id
-		String groupId = context.getString("consumerGroupId");
-		// Kafka fetch size
-		int fetchSize = context.getInt("fetchSize");
-		// Kafka deserializer class
-		String deserClsName = context.getString("deserializerClass");
-		// Kafka broker zk connection
-		String zkConnString = context.getString("zkConnection");
-		// transaction zkRoot
-		String zkRoot = context.getString("transactionZKRoot");
-
-        LOG.info(String.format("Use topic id: %s",topic));
-
-        String brokerZkPath = null;
-        if(context.hasPath("brokerZkPath")) {
-            brokerZkPath = context.getString("brokerZkPath");
-        }
-
-        BrokerHosts hosts;
-        if(brokerZkPath == null) {
-            hosts = new ZkHosts(zkConnString);
-        } else {
-            hosts = new ZkHosts(zkConnString, brokerZkPath);
-        }
-        
-		SpoutConfig spoutConfig = new SpoutConfig(hosts, 
-				topic,
-				zkRoot + "/" + topic,
-				groupId);
-		
-		// transaction zkServers
-		spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(","));
-		// transaction zkPort
-		spoutConfig.zkPort = context.getInt("transactionZKPort");
-		// transaction update interval
-		spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS");
-		// Kafka fetch size
-		spoutConfig.fetchSizeBytes = fetchSize;		
-		// "startOffsetTime" is for test usage, prod should not use this
-		if (context.hasPath("startOffsetTime")) {
-			spoutConfig.startOffsetTime = context.getInt("startOffsetTime");
-		}		
-		// "forceFromStart" is for test usage, prod should not use this 
-		if (context.hasPath("forceFromStart")) {
-			spoutConfig.forceFromStart = context.getBoolean("forceFromStart");
-		}
-		
-		spoutConfig.scheme = getStreamScheme(deserClsName, context);
-        return new KafkaSpout(spoutConfig);
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
deleted file mode 100644
index 15401fd..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm.kafka;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
-import org.apache.eagle.datastream.utils.NameConstants;
-
-import java.lang.reflect.Constructor;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * This scheme defines how a kafka message is deserialized and the output field name for storm stream
- * it includes the following:
- * 1. data source is kafka, so need kafka message deserializer class
- * 2. output field declaration
- */
-public class KafkaSourcedSpoutScheme implements Scheme {
-	protected SpoutKafkaMessageDeserializer deserializer;
-	
-	public KafkaSourcedSpoutScheme(String deserClsName, Config context){
-		try{
-			Properties prop = new Properties();
-            if(context.hasPath("eagleProps")) {
-                prop.putAll(context.getObject("eagleProps"));
-            }
-			Constructor<?> constructor =  Class.forName(deserClsName).getConstructor(Properties.class);
-			deserializer = (SpoutKafkaMessageDeserializer) constructor.newInstance(prop);
-		}catch(Exception ex){
-			throw new RuntimeException("Failed to create new instance for decoder class " + deserClsName, ex);
-		}
-	}
-	
-	@Override
-	public List<Object> deserialize(byte[] ser) {
-		Object tmp = deserializer.deserialize(ser);
-		if(tmp == null)
-			return null;
-		// the following tasks are executed within the same process of kafka spout
-		return Arrays.asList(tmp);
-	}
-
-    /**
-     * Default only f0, but it requires to be overrode if different
-     *
-     * TODO: Handle the schema with KeyValue based structure
-     *
-     * @return Fields
-     */
-	@Override
-	public Fields getOutputFields() {
-        return new Fields(NameConstants.FIELD_PREFIX()+"0");
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java
deleted file mode 100644
index 76ca458..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm.kafka;
-
-import java.io.Serializable;
-
-public interface SpoutKafkaMessageDeserializer extends Serializable{
-	public Object deserialize(byte[] arg0);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/CustomPartitionGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/CustomPartitionGrouping.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/CustomPartitionGrouping.java
deleted file mode 100644
index 9c1ab68..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/CustomPartitionGrouping.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- *    Licensed to the Apache Software Foundation (ASF) under one or more
- *    contributor license agreements.  See the NOTICE file distributed with
- *    this work for additional information regarding copyright ownership.
- *    The ASF licenses this file to You under the Apache License, Version 2.0
- *    (the "License"); you may not use this file except in compliance with
- *    the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *    Unless required by applicable law or agreed to in writing, software
- *    distributed under the License is distributed on an "AS IS" BASIS,
- *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *    See the License for the specific language governing permissions and
- *    limitations under the License.
- *
- */
-
-package org.apache.eagle.dataproc.impl.storm.partition;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-import org.apache.eagle.partition.PartitionStrategy;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class CustomPartitionGrouping implements CustomStreamGrouping {
-
-    public List<Integer> targetTasks;
-    public PartitionStrategy strategy;
-
-    public CustomPartitionGrouping(PartitionStrategy strategy) {
-        this.strategy = strategy;
-    }
-
-    @Override
-    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
-        this.targetTasks = new ArrayList<>(targetTasks);
-    }
-
-    @Override
-    public List<Integer> chooseTasks(int taskId, List<Object> values) {
-        int numTasks = targetTasks.size();
-        int targetTaskIndex = strategy.balance((String)values.get(0), numTasks);
-        return Arrays.asList(targetTasks.get(targetTaskIndex));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
deleted file mode 100644
index f9515f5..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm.zookeeper;
-
-import java.io.Serializable;
-
-public class ZKStateConfig implements Serializable {
-    private static final long serialVersionUID = 1L;
-    public String zkQuorum;
-    public String zkRoot;
-    public int zkSessionTimeoutMs;
-    public int zkRetryTimes;
-    public int zkRetryInterval;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
deleted file mode 100644
index 993a4a2..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import org.apache.eagle.datastream.utils.NameConstants;
-
-public class JavaMapperStormExecutor extends BaseRichBolt{
-    public static class e1 extends JavaMapperStormExecutor {
-        public e1(JavaMapper mapper){
-            super(1, mapper);
-        }
-    }
-    public static class e2 extends JavaMapperStormExecutor {
-        public e2(JavaMapper mapper){
-            super(2, mapper);
-        }
-    }
-    public static class e3 extends JavaMapperStormExecutor {
-        public e3(JavaMapper mapper){
-            super(3, mapper);
-        }
-    }
-    public static class e4 extends JavaMapperStormExecutor {
-        public e4(JavaMapper mapper){
-            super(4, mapper);
-        }
-    }
-
-    private JavaMapper mapper;
-    private OutputCollector collector;
-    private int numOutputFields;
-    public JavaMapperStormExecutor(int numOutputFields, JavaMapper mapper){
-        this.numOutputFields = numOutputFields;
-        this.mapper = mapper;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        List<Object> ret = mapper.map(input.getValues());
-        this.collector.emit(ret);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        List<String> fields = new ArrayList<String>();
-        for(int i=0; i<numOutputFields; i++){
-            fields.add(NameConstants.FIELD_PREFIX() + i);
-        }
-        declarer.declare(new Fields(fields));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
deleted file mode 100644
index a485d76..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.datastream;
-
-import java.util.List;
-import java.util.SortedMap;
-
-import com.typesafe.config.Config;
-import scala.Tuple2;
-import scala.Tuple3;
-
-public class JavaStormExecutorForAlertWrapper extends JavaStormStreamExecutor3<String, String, SortedMap<Object, Object>>{
-    private JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> delegate;
-    private String streamName;
-    public JavaStormExecutorForAlertWrapper(JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> delegate, String streamName){
-        this.delegate = delegate;
-        this.streamName = streamName;
-    }
-    @Override
-    public void prepareConfig(Config config) {
-        delegate.prepareConfig(config);
-    }
-
-    @Override
-    public void init() {
-        delegate.init();
-    }
-
-    @Override
-    public void flatMap(List<Object> input, final Collector<Tuple3<String, String, SortedMap<Object, Object>>> collector) {
-        Collector delegateCollector = new Collector(){
-            @Override
-            public void collect(Object o) {
-                Tuple2 tuple2 = (Tuple2)o;
-                collector.collect(new Tuple3(tuple2._1, streamName, tuple2._2));
-            }
-        };
-        delegate.flatMap(input, delegateCollector);
-    }
-    
-    public JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> getDelegate() {
-    	return delegate;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java
deleted file mode 100644
index 04b4bed..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.utils;
-
-import java.lang.reflect.ParameterizedType;
-
-/**
- * @since 12/7/15
- */
-class JavaReflections {
-    @SuppressWarnings("unchecked")
-    public static Class<?> getGenericTypeClass(final Object obj,int index) {
-        return (Class<?>) ((ParameterizedType) obj
-                .getClass()
-                .getGenericSuperclass()).getActualTypeArguments()[index];
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
deleted file mode 100644
index 90e59cf..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.eagle.dataproc.util.ConfigOptionParser
-import org.apache.eagle.datastream.core._
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment
-
-import scala.reflect.runtime.universe._
-
-/**
- * Execution environment factory
- *
- * The factory is mainly used for create or manage execution environment,
- * and also handles the shared works like configuration, arguments for execution environment
- *
- * Notice: this factory class should not know any implementation like storm or spark
- *
- * @since 0.3.0
- */
-object ExecutionEnvironments{
-  type storm = StormExecutionEnvironment
-
-  /**
-   * Use `'''get[StormExecutionEnvironment](config)'''` instead
-   *
-   * @param config
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm(config : Config) = new StormExecutionEnvironment(config)
-
-  /**
-   * Use `'''get[StormExecutionEnvironment]'''` instead
-   *
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm:StormExecutionEnvironment = {
-    val config = ConfigFactory.load()
-    getStorm(config)
-  }
-
-  /**
-   * Use `'''get[StormExecutionEnvironment](args)'''` instead
-   *
-   * @see get[StormExecutionEnvironment](args)
-    * @param args
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm(args:Array[String]):StormExecutionEnvironment = {
-    getStorm(new ConfigOptionParser().load(args))
-  }
-
-  /**
-   * @param typeTag
-   * @tparam T
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](implicit typeTag: TypeTag[T]): T ={
-    getWithConfig[T](ConfigFactory.load())
-  }
-
-  /**
-   *
-   * @param config
-   * @param typeTag
-   * @tparam T
-   * @return
-   */
-  def getWithConfig[T <: ExecutionEnvironment](config:Config)(implicit typeTag: TypeTag[T]): T ={
-    typeTag.mirror.runtimeClass(typeOf[T]).getConstructor(classOf[Config]).newInstance(config).asInstanceOf[T]
-  }
-
-  /**
-   *
-   * @param args
-   * @param typeTag
-   * @tparam T
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](args:Array[String])(implicit typeTag: TypeTag[T]): T ={
-    getWithConfig[T](new ConfigOptionParser().load(args))
-  }
-
-  /**
-   * Support java style for default config
-   *
-   * @param clazz execution environment class
-   * @tparam T execution environment type
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](clazz:Class[T]):T ={
-    get[T](ConfigFactory.load(),clazz)
-  }
-
-  def get[T<:ExecutionEnvironment](clazz:Class[T], config:Config):T ={
-    get[T](config,clazz)
-  }
-
-  /**
-   * Support java style
-    *
-    * @param config command config
-   * @param clazz execution environment class
-   * @tparam T execution environment type
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](config:Config,clazz:Class[T]):T ={
-    clazz.getConstructor(classOf[Config]).newInstance(config)
-  }
-
-  /**
-   * Support java style
-   *
-   * @param args command arguments in string array
-   * @param clazz execution environment class
-   * @tparam T execution environment type
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](args:Array[String],clazz:Class[T]):T ={
-    clazz.getConstructor(classOf[Config]).newInstance(new ConfigOptionParser().load(args))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala
deleted file mode 100644
index 46f4738..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.core
-
-trait AbstractTopologyCompiler{
-  def buildTopology : AbstractTopologyExecutor
-}



Mime
View raw message