Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3E995200B68 for ; Fri, 19 Aug 2016 19:24:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3D116160AAB; Fri, 19 Aug 2016 17:24:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1962B160A79 for ; Fri, 19 Aug 2016 19:24:11 +0200 (CEST) Received: (qmail 73308 invoked by uid 500); 19 Aug 2016 17:24:11 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 73299 invoked by uid 99); 19 Aug 2016 17:24:11 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Aug 2016 17:24:11 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 990D21A5313 for ; Fri, 19 Aug 2016 17:24:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id HV22Bk6WlUqs for ; Fri, 19 Aug 2016 17:24:00 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 7370D5F30E for ; Fri, 19 Aug 2016 17:23:58 +0000 (UTC) Received: (qmail 71292 invoked by uid 99); 19 Aug 2016 17:23:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Aug 2016 17:23:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 87F6DE00D6; Fri, 19 Aug 2016 17:23:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yonzhang2012@apache.org To: commits@eagle.incubator.apache.org Date: Fri, 19 Aug 2016 17:23:57 -0000 Message-Id: <1c3244b70d8c449db9982fc179b5b849@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-eagle git commit: EAGLE-477 eagle-data-process project clean up to contain only common processing eagle-data-process project clean up to contain only common processing archived-at: Fri, 19 Aug 2016 17:24:14 -0000 Repository: incubator-eagle Updated Branches: refs/heads/develop d6ec142d3 -> b31bac50b http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/StormSpoutProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/StormSpoutProvider.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/StormSpoutProvider.java new file mode 100644 index 0000000..7d69bbd --- /dev/null +++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/StormSpoutProvider.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.dataproc.impl.storm; + +import backtype.storm.topology.base.BaseRichSpout; + +import com.typesafe.config.Config; + +/** + * Normally storm spout is a special part of storm topology and it is implemented in underlying spout implementation + * which can be retrieved from getSpout method. + */ +public interface StormSpoutProvider { + BaseRichSpout getSpout(Config context); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/ValuesArray.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/ValuesArray.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/ValuesArray.java new file mode 100644 index 0000000..de06105 --- /dev/null +++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/ValuesArray.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.dataproc.impl.storm; + +import java.util.ArrayList; + +/** + * multiple datapoints are stored within one ValuesArray object + * sent out + */ +public class ValuesArray extends ArrayList{ + private static final long serialVersionUID = -8218427810421668178L; + + public ValuesArray() { + + } + + public ValuesArray(Object... vals) { + super(vals.length); + for(Object o: vals) { + add(o); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java new file mode 100644 index 0000000..7454cc2 --- /dev/null +++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.dataproc.impl.storm.kafka; + +import backtype.storm.spout.Scheme; +import backtype.storm.spout.SchemeAsMultiScheme; +import backtype.storm.topology.base.BaseRichSpout; +import com.typesafe.config.Config; +import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import storm.kafka.BrokerHosts; +import storm.kafka.KafkaSpout; +import storm.kafka.SpoutConfig; +import storm.kafka.ZkHosts; + +import java.util.Arrays; +import java.util.stream.Collectors; + +/** + * Since 6/8/16. + */ +public class KafkaSpoutProvider implements StormSpoutProvider { + private final static Logger LOG = LoggerFactory.getLogger(KafkaSpoutProvider.class); + private final static String DEFAULT_CONFIG_PREFIX = "dataSourceConfig"; + private final static String DEFAULT_CONSUMER_GROUP_ID = "eagleConsumer"; + private final static String DEFAULT_TRANSACTION_ZK_ROOT = "/consumers"; + + private String configPrefix = DEFAULT_CONFIG_PREFIX; + + public KafkaSpoutProvider(){} + + public KafkaSpoutProvider(String prefix){ + this.configPrefix = prefix; + } + + @Override + public BaseRichSpout getSpout(Config config){ + Config context = config; + if(this.configPrefix!=null) context = config.getConfig(configPrefix); + + // the following is for fetching data from one topic + // Kafka topic + String topic = context.getString("topic"); + // Kafka broker zk connection + String zkConnString = context.getString("zkConnection"); + // Kafka fetch size + int fetchSize = context.hasPath("fetchSize") ? context.getInt("fetchSize") : 1048586; + LOG.info(String.format("Use topic : %s, zkConnection : %s , fetchSize : %d", topic, zkConnString, fetchSize)); + + /* + the following is for recording offset for processing the data + the zk path to store current offset is comprised of the following + offset zkPath = zkRoot + "/" + topic + "/" + consumerGroupId + "/" + partition_Id + + consumerGroupId is for differentiating different consumers which consume the same topic + */ + // transaction zkRoot + String zkRoot = context.hasPath("transactionZKRoot") ? context.getString("transactionZKRoot") : DEFAULT_TRANSACTION_ZK_ROOT; + // Kafka consumer group id + String groupId = context.hasPath("consumerGroupId") ? context.getString("consumerGroupId") : DEFAULT_CONSUMER_GROUP_ID; + String brokerZkPath = context.hasPath("brokerZkPath") ? context.getString("brokerZkPath") : null; + BrokerHosts hosts; + if(brokerZkPath == null) { + hosts = new ZkHosts(zkConnString); + } else { + hosts = new ZkHosts(zkConnString, brokerZkPath); + } + + SpoutConfig spoutConfig = new SpoutConfig(hosts, + topic, + zkRoot + "/" + topic, + groupId); + + // transaction zkServers + String[] txZkServers = context.hasPath("txZkServers") ? context.getString("txZkServers").split(",") : new String[]{"localhost:2181"}; + spoutConfig.zkServers = Arrays.asList(txZkServers).stream().map(server -> server.split(":")[0]).collect(Collectors.toList()); + // transaction zkPort + spoutConfig.zkPort = Integer.parseInt(txZkServers[0].split(":")[1]); + LOG.info("txZkServers:" + spoutConfig.zkServers + ", zkPort:" + spoutConfig.zkPort); + // transaction update interval + spoutConfig.stateUpdateIntervalMs = context.hasPath("transactionStateUpdateMS") ? context.getLong("transactionStateUpdateMS") : 2000; + // Kafka fetch size + spoutConfig.fetchSizeBytes = fetchSize; + // "startOffsetTime" is for test usage, prod should not use this + if (context.hasPath("startOffsetTime")) { + spoutConfig.startOffsetTime = context.getInt("startOffsetTime"); + } + // "forceFromStart" is for test usage, prod should not use this + if (context.hasPath("forceFromStart")) { + spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); + } + + if (context.hasPath("schemeCls")) { + try { + Scheme s = (Scheme)Class.forName(context.getString("schemeCls")).newInstance(); + spoutConfig.scheme = new SchemeAsMultiScheme(s); + }catch(Exception ex){ + LOG.error("error instantiating scheme object"); + throw new IllegalStateException(ex); + } + }else{ + String err = "schemeCls must be present"; + LOG.error(err); + throw new IllegalStateException(err); + } + return new KafkaSpout(spoutConfig); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java new file mode 100644 index 0000000..76ca458 --- /dev/null +++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/SpoutKafkaMessageDeserializer.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.dataproc.impl.storm.kafka; + +import java.io.Serializable; + +public interface SpoutKafkaMessageDeserializer extends Serializable{ + public Object deserialize(byte[] arg0); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/CustomPartitionGrouping.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/CustomPartitionGrouping.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/CustomPartitionGrouping.java new file mode 100644 index 0000000..09193de --- /dev/null +++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/CustomPartitionGrouping.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.dataproc.impl.storm.partition; + +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.grouping.CustomStreamGrouping; +import backtype.storm.task.WorkerTopologyContext; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class CustomPartitionGrouping implements CustomStreamGrouping { + + public List targetTasks; + public PartitionStrategy strategy; + + public CustomPartitionGrouping(PartitionStrategy strategy) { + this.strategy = strategy; + } + + @Override + public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) { + this.targetTasks = new ArrayList<>(targetTasks); + } + + @Override + public List chooseTasks(int taskId, List values) { + int numTasks = targetTasks.size(); + int targetTaskIndex = strategy.balance((String)values.get(0), numTasks); + return Arrays.asList(targetTasks.get(targetTaskIndex)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/DataDistributionDao.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/DataDistributionDao.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/DataDistributionDao.java new file mode 100644 index 0000000..d54fab9 --- /dev/null +++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/DataDistributionDao.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.dataproc.impl.storm.partition; + +import java.io.Serializable; +import java.util.List; + +public interface DataDistributionDao extends Serializable { + + List fetchDataDistribution(long startTime, long endTime) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionAlgorithm.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionAlgorithm.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionAlgorithm.java new file mode 100644 index 0000000..ca08bb1 --- /dev/null +++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionAlgorithm.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.dataproc.impl.storm.partition; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +public interface PartitionAlgorithm extends Serializable { + Map partition(List weights, int k); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionStrategy.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionStrategy.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionStrategy.java new file mode 100644 index 0000000..ae05a6c --- /dev/null +++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionStrategy.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.dataproc.impl.storm.partition; + +import java.io.Serializable; + +public interface PartitionStrategy extends Serializable { + + int balance(String key, int buckNum); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionStrategyImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionStrategyImpl.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionStrategyImpl.java new file mode 100644 index 0000000..e36e75c --- /dev/null +++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/PartitionStrategyImpl.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.dataproc.impl.storm.partition; + +import org.apache.commons.lang3.time.DateUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +public class PartitionStrategyImpl implements PartitionStrategy { + + public DataDistributionDao dao; + public PartitionAlgorithm algorithm; + public Map routingTable; + public long lastRefreshTime; + public long refreshInterval; + public long timeRange; + public static long DEFAULT_TIME_RANGE = 2 * DateUtils.MILLIS_PER_DAY; + public static long DEFAULT_REFRESH_INTERVAL = 2 * DateUtils.MILLIS_PER_HOUR; + private final Logger LOG = LoggerFactory.getLogger(PartitionStrategyImpl.class); + + public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm, long refreshInterval, long timeRange) { + this.dao = dao; + this.algorithm = algorithm; + this.refreshInterval = refreshInterval; + this.timeRange = timeRange; + } + + public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm) { + this(dao, algorithm, DEFAULT_REFRESH_INTERVAL, DEFAULT_TIME_RANGE); + } + + public boolean needRefresh() { + if (System.currentTimeMillis() > lastRefreshTime + refreshInterval) { + lastRefreshTime = System.currentTimeMillis(); + return true; + } + return false; + } + + public Map generateRoutingTable(int buckNum) { + try { + long currentTime = System.currentTimeMillis(); + List weights = dao.fetchDataDistribution(currentTime - timeRange, currentTime); + routingTable = algorithm.partition(weights, buckNum); + return routingTable; + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + @Override + public int balance(String key, int buckNum) { + if (needRefresh()) { + LOG.info("Going to refresh routing table"); + routingTable = generateRoutingTable(buckNum); + LOG.info("Finish refresh routing table"); + } + if (routingTable.containsKey(key)) { + return routingTable.get(key); + } + else { + return Math.abs(key.hashCode()) % buckNum; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/Weight.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/Weight.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/Weight.java new file mode 100644 index 0000000..314812a --- /dev/null +++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/Weight.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.dataproc.impl.storm.partition; + +public class Weight { + public String key; + public Double value; + + public Weight(String key, Double value) { + this.key = key; + this.value = value; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java new file mode 100644 index 0000000..f9515f5 --- /dev/null +++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.dataproc.impl.storm.zookeeper; + +import java.io.Serializable; + +public class ZKStateConfig implements Serializable { + private static final long serialVersionUID = 1L; + public String zkQuorum; + public String zkRoot; + public int zkSessionTimeoutMs; + public int zkRetryTimes; + public int zkRetryInterval; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/src/main/resources/application.conf b/eagle-core/eagle-data-process/src/main/resources/application.conf new file mode 100644 index 0000000..c386a71 --- /dev/null +++ b/eagle-core/eagle-data-process/src/main/resources/application.conf @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + "envContextConfig" : { + "env" : "storm", + "mode" : "local", + "topologyName" : "SpadesMonitorTopology", + "stormConfigFile" : "spades-monitor-storm.yaml", + "parallelismConfig" : { + "SpadesMonitorStream" : 1, + "SpadesMonitorExecutor*" : 1 + } + }, + "dataSourceConfig": { + "topic" : "spades_monitor_sandbox", + "zkConnection" : "sandbox.hortonworks.com:2181", + "zkConnectionTimeoutMS" : 15000, + "consumerGroupId" : "eagle.consumer", + "fetchSize" : 1048586, + "transactionZKServers" : "sandbox.hortonworks.com", + "transactionZKPort" : 2181, + "transactionZKRoot" : "/consumers", + "transactionStateUpdateMS" : 2000 + }, + "alertExecutorConfigs" : { + "SpadesMonitorExecutor" : { + "parallelism" : 1, + "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner" + "needValidation" : "true" + } + }, + "eagleProps" : { + "site" : "sandbox", + "application": "SpadesMonitor", + "dataJoinPollIntervalSec" : 30, + "mailHost" : "mailHost.com", + "mailSmtpPort":"25", + "mailDebug" : "true", + "balancePartitionEnabled" : true, + #"partitionRefreshIntervalInMin" : 60, + #"kafkaStatisticRangeInMin" : 60, + "eagleService": { + "host": "localhost", + "port": 9099, + "username": "admin", + "password": "secret" + } + "readHdfsUserCommandPatternFrom" : "file" + }, + "dynamicConfigSource" : { + "enabled" : true, + "initDelayMillis" : 0, + "delayMillis" : 30000 + }, + "eagleNotificationProps" : { + "eagleStoreEnabled": true, + "kafka_broker":"127.0.0.1:6667" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-data-process/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/src/main/resources/log4j.properties b/eagle-core/eagle-data-process/src/main/resources/log4j.properties new file mode 100644 index 0000000..d59ded6 --- /dev/null +++ b/eagle-core/eagle-data-process/src/main/resources/log4j.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout + +# standard output +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-policy/eagle-policy-base/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/pom.xml b/eagle-core/eagle-policy/eagle-policy-base/pom.xml index a6f1b02..13bb0d9 100644 --- a/eagle-core/eagle-policy/eagle-policy-base/pom.xml +++ b/eagle-core/eagle-policy/eagle-policy-base/pom.xml @@ -48,7 +48,7 @@ org.apache.eagle - eagle-stream-process-base + eagle-data-process ${project.version} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java deleted file mode 100644 index 2a33460..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy.common; - -public class Constants { - public final static String ALERT_SERVICE_ENDPOINT_NAME = "AlertService"; - public final static String ALERT_DEFINITION_SERVICE_ENDPOINT_NAME = "AlertDefinitionService"; - public final static String ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME = "AlertStreamSchemaService"; - public final static String ALERT_DATA_SOURCE_SERVICE_ENDPOINT_NAME = "AlertDataSourceService"; - public final static String ALERT_EXECUTOR_SERVICE_ENDPOINT_NAME = "AlertExecutorService"; - public final static String ALERT_STREAM_SERVICE_ENDPOINT_NAME = "AlertStreamService"; - public final static String ALERT_NOTIFICATION_SERVICE_ENDPOINT_NAME = "AlertNotificationService"; - public static final String ALERT_EMAIL_ORIGIN_PROPERTY = "alertEmailOrigin"; - public static final String ALERT_TIMESTAMP_PROPERTY = "alertTimestamp"; - - public static final String SITE_APPLICATION_SERVICE_ENDPOINT_NAME = "SiteApplicationService"; - public static final String SITE_DESCRIPTION_SERVICE_ENDPOINT_NAME = "SiteDescService"; - public static final String APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME = "ApplicationDescService"; - public static final String FEATURE_DESCRIPTION_SERVICE_ENDPOINT_NAME = "FeatureDescService"; - - public static final String TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME = "TopologyExecutionService"; - public static final String TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME = "TopologyOperationService"; - public static final String TOPOLOGY_DESCRIPTION_SERVICE_ENDPOINT_NAME = "TopologyDescriptionService"; - - public static final String GENERIC_RESOURCE_SERVICE_ENDPOINT_NAME = "GenericResourceService"; - - public final static String AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME = "AggregateDefinitionService"; - - public static final String ALERT_EMAIL_TIME_PROPERTY = "timestamp"; - public static final String ALERT_EMAIL_COUNT_PROPERTY = "count"; - public static final String ALERT_EMAIL_ALERTLIST_PROPERTY = "alertList"; - - public static final String URL = "url"; - public static final String ALERT_SOURCE = "alertSource"; - public static final String ALERT_MESSAGE = "alertMessage"; - public static final String SUBJECT = "subject"; - public static final String ALERT_EXECUTOR_ID = "alertExecutorId"; - public static final String POLICY_NAME = "policyName"; - public static final String POLICY_ID = "policyId"; - public static final String SOURCE_STREAMS = "sourceStreams"; - public static final String ALERT_EVENT = "alertEvent"; - public static final String POLICY_DETAIL_URL = "policyDetailUrl"; - public static final String ALERT_DETAIL_URL = "alertDetailUrl"; - - public static final String POLICY_DEFINITION = "policyDefinition"; - public static final String POLICY_TYPE = "policyType"; - public static final String STREAM_NAME = "streamName"; - public static final String ATTR_NAME = "attrName"; - - public static final String ALERT_EXECUTOR_CONFIGS = "alertExecutorConfigs"; - public static final String PARALLELISM = "parallelism"; - public static final String PARTITIONER = "partitioner"; - public static final String SOURCE = "source"; - public static final String PARTITIONSEQ = "partitionSeq"; - public static final String EXECUTOR_ID = "executorId"; - - public enum policyType { - siddhiCEPEngine, - MachineLearning; - - policyType() { - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/AttributeType.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/AttributeType.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/AttributeType.java deleted file mode 100644 index a5d506f..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/AttributeType.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy.siddhi; - -/** - * @see org.wso2.siddhi.query.api.definition.Attribute.Type - */ -public enum AttributeType { - STRING, - LONG, - INTEGER, - BOOL, - FLOAT, - DOUBLE -// , OBJECT -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext deleted file mode 100644 index cce3aca..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext +++ /dev/null @@ -1,16 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-gc/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-gc/pom.xml b/eagle-gc/pom.xml index 653cdff..f8615ef 100644 --- a/eagle-gc/pom.xml +++ b/eagle-gc/pom.xml @@ -44,7 +44,7 @@ org.apache.eagle - eagle-stream-process-api + eagle-data-process ${project.version} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java index acb442e..ebbeee1 100644 --- a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java +++ b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java @@ -29,13 +29,11 @@ import com.codahale.metrics.MetricRegistry; import com.typesafe.config.Config; import org.apache.eagle.common.config.EagleConfigConstants; import org.apache.eagle.common.config.EagleConfigHelper; -import org.apache.eagle.datastream.*; import org.apache.eagle.gc.common.GCConstants; import org.apache.eagle.gc.model.GCPausedEvent; import org.apache.eagle.metric.reportor.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Tuple2; import java.util.*; @@ -122,4 +120,4 @@ public class GCMetricGeneratorBolt extends BaseRichBolt { collector.ack(input); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-hadoop-queue/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/pom.xml b/eagle-jpm/eagle-hadoop-queue/pom.xml index 44269b0..f9cfed5 100644 --- a/eagle-jpm/eagle-hadoop-queue/pom.xml +++ b/eagle-jpm/eagle-hadoop-queue/pom.xml @@ -44,7 +44,7 @@ org.apache.eagle - eagle-stream-process-api + eagle-data-process ${project.version} @@ -88,4 +88,4 @@ - \ No newline at end of file + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java index a285a6b..caf99ad 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java @@ -23,12 +23,10 @@ import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; -import backtype.storm.topology.BoltDeclarer; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.TopologyBuilder; import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.dataproc.util.ConfigOptionParser; +import org.apache.eagle.common.config.ConfigOptionParser; import org.apache.eagle.hadoop.queue.storm.HadoopQueueMetricPersistBolt; import org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java index b8a5b45..693fdd1 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java @@ -24,7 +24,7 @@ package org.apache.eagle.hadoop.queue.crawler; import backtype.storm.spout.SpoutOutputCollector; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.eagle.common.DateTimeUtil; -import org.apache.eagle.dataproc.core.ValuesArray; +import org.apache.eagle.dataproc.impl.storm.ValuesArray; import org.apache.eagle.hadoop.queue.model.clusterMetrics.ClusterMetrics; import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java index be06e12..ab56508 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java @@ -23,7 +23,7 @@ package org.apache.eagle.hadoop.queue.crawler; import backtype.storm.spout.SpoutOutputCollector; import org.apache.eagle.common.DateTimeUtil; -import org.apache.eagle.dataproc.core.ValuesArray; +import org.apache.eagle.dataproc.impl.storm.ValuesArray; import org.apache.eagle.hadoop.queue.model.applications.App; import org.apache.eagle.hadoop.queue.model.applications.Apps; import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java index 99d83d4..82f433e 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java @@ -19,7 +19,7 @@ package org.apache.eagle.hadoop.queue.crawler; import backtype.storm.spout.SpoutOutputCollector; -import org.apache.eagle.dataproc.core.ValuesArray; +import org.apache.eagle.dataproc.impl.storm.ValuesArray; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName; import org.apache.eagle.hadoop.queue.model.scheduler.*; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-mr-history/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/pom.xml b/eagle-jpm/eagle-jpm-mr-history/pom.xml index 5568284..bfd4cf2 100644 --- a/eagle-jpm/eagle-jpm-mr-history/pom.xml +++ b/eagle-jpm/eagle-jpm-mr-history/pom.xml @@ -33,6 +33,14 @@ http://maven.apache.org + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + org.slf4j slf4j-api @@ -48,22 +56,7 @@ org.apache.eagle - eagle-stream-process-api - ${project.version} - - - org.wso2.orbit.com.lmax - disruptor - - - asm - asm - - - - - org.apache.eagle - eagle-stream-process-base + eagle-data-process ${project.version} @@ -140,4 +133,4 @@ - \ No newline at end of file + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java index 447a59a..c99891b 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java @@ -19,7 +19,7 @@ package org.apache.eagle.jpm.mr.history.common; import com.typesafe.config.Config; -import org.apache.eagle.dataproc.util.ConfigOptionParser; +import org.apache.eagle.common.config.ConfigOptionParser; import org.apache.eagle.jpm.util.DefaultJobIdPartitioner; import org.apache.eagle.jpm.util.JobIdPartitioner; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java index ff0c8c8..6f85149 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,7 +18,6 @@ package org.apache.eagle.jpm.mr.history.crawler; -import org.apache.eagle.dataproc.core.EagleOutputCollector; import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; import org.apache.eagle.jpm.mr.history.parser.JHFParserBase; import org.apache.eagle.jpm.mr.history.parser.JHFParserFactory; @@ -34,13 +33,11 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback { private JobHistoryContentFilter m_filter; - private EagleOutputCollector m_eagleCollector; private JHFConfigManager m_configManager; public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, JHFConfigManager configManager, EagleOutputCollector eagleCollector) { this.m_filter = filter; this.m_configManager = configManager; - this.m_eagleCollector = eagleCollector; } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java new file mode 100644 index 0000000..693e876 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/EagleOutputCollector.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.mr.history.crawler; + +import org.apache.eagle.dataproc.impl.storm.ValuesArray; + +import java.io.Serializable; + +/** + * expose simple interface for streaming executor to populate output data + * + */ +public interface EagleOutputCollector extends Serializable{ + void collect(ValuesArray t); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java index e055957..211dd9d 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,8 +19,7 @@ package org.apache.eagle.jpm.mr.history.crawler; import backtype.storm.spout.SpoutOutputCollector; -import org.apache.eagle.dataproc.core.EagleOutputCollector; -import org.apache.eagle.dataproc.core.ValuesArray; +import org.apache.eagle.dataproc.impl.storm.ValuesArray; public class JobHistorySpoutCollectorInterceptor implements EagleOutputCollector { private SpoutOutputCollector m_collector; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java index 27de19c..d97e2a3 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,7 +18,6 @@ package org.apache.eagle.jpm.mr.history.parser; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity; import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity; @@ -124,7 +123,7 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi ObjectMapper objectMapper = new ObjectMapper(); try { LOG.warn("Unknown task type of task attempt execution entity: "+objectMapper.writeValueAsString(entity)); - } catch (JsonProcessingException e) { + } catch (Exception e) { LOG.error(e.getMessage(),e); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-mr-running/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/pom.xml b/eagle-jpm/eagle-jpm-mr-running/pom.xml index e409630..414a221 100644 --- a/eagle-jpm/eagle-jpm-mr-running/pom.xml +++ b/eagle-jpm/eagle-jpm-mr-running/pom.xml @@ -38,22 +38,7 @@ org.apache.eagle - eagle-stream-process-api - ${project.version} - - - org.wso2.orbit.com.lmax - disruptor - - - asm - asm - - - - - org.apache.eagle - eagle-stream-process-base + eagle-data-process ${project.version} @@ -123,4 +108,4 @@ - \ No newline at end of file + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java index 05e7812..a91a493 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java @@ -19,9 +19,9 @@ package org.apache.eagle.jpm.mr.running.config; import com.typesafe.config.Config; +import org.apache.eagle.common.config.ConfigOptionParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.eagle.dataproc.util.ConfigOptionParser; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-spark-running/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/pom.xml b/eagle-jpm/eagle-jpm-spark-running/pom.xml index 34d8545..64dd1d8 100644 --- a/eagle-jpm/eagle-jpm-spark-running/pom.xml +++ b/eagle-jpm/eagle-jpm-spark-running/pom.xml @@ -35,7 +35,7 @@ org.apache.eagle - eagle-stream-process-api + eagle-data-process ${project.version} @@ -54,21 +54,6 @@ ${curator.version} - org.apache.eagle - eagle-stream-process-base - ${project.version} - - - org.wso2.orbit.com.lmax - disruptor - - - asm - asm - - - - org.jsoup jsoup http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java index 668bc02..5988273 100644 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java @@ -19,7 +19,7 @@ package org.apache.eagle.jpm.spark.running; import com.typesafe.config.Config; -import org.apache.eagle.dataproc.util.ConfigOptionParser; +import org.apache.eagle.common.config.ConfigOptionParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-metric-collection/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/pom.xml b/eagle-security/eagle-metric-collection/pom.xml index da7587d..32392fe 100644 --- a/eagle-security/eagle-metric-collection/pom.xml +++ b/eagle-security/eagle-metric-collection/pom.xml @@ -77,12 +77,7 @@ org.apache.eagle - eagle-stream-process-base - ${project.version} - - - org.apache.eagle - eagle-stream-process-api + eagle-data-process ${project.version} @@ -92,4 +87,4 @@ 0.13.0 - \ No newline at end of file + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java index 4ef50ea..646f2d4 100644 --- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java @@ -43,15 +43,7 @@ public class EagleMetricCollectorApplication extends StormApplication{ @Override public StormTopology execute(Config config, StormEnvironment environment) { String deserClsName = config.getString("dataSourceConfig.deserializerClass"); - final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) { - @Override - public List deserialize(byte[] ser) { - Object tmp = deserializer.deserialize(ser); - Map map = (Map)tmp; - if(tmp == null) return null; - return Arrays.asList(map.get("user"), map.get("timestamp")); - } - }; + KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config); TopologyBuilder builder = new TopologyBuilder(); BaseRichSpout spout1 = new KafkaOffsetSourceSpoutProvider().getSpout(config); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java index 14b2384..e9bc381 100644 --- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java @@ -20,11 +20,11 @@ import backtype.storm.spout.Scheme; import backtype.storm.tuple.Fields; import com.typesafe.config.Config; import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer; -import org.apache.eagle.datastream.utils.NameConstants; import java.lang.reflect.Constructor; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Properties; /** @@ -35,7 +35,7 @@ import java.util.Properties; */ public class KafkaSourcedSpoutScheme implements Scheme { protected SpoutKafkaMessageDeserializer deserializer; - + public KafkaSourcedSpoutScheme(String deserClsName, Config context){ try{ Properties prop = new Properties(); @@ -48,25 +48,21 @@ public class KafkaSourcedSpoutScheme implements Scheme { throw new RuntimeException("Failed to create new instance for decoder class " + deserClsName, ex); } } - + @Override public List deserialize(byte[] ser) { Object tmp = deserializer.deserialize(ser); - if(tmp == null) - return null; - // the following tasks are executed within the same process of kafka spout - return Arrays.asList(tmp); + Map map = (Map)tmp; + if(tmp == null) return null; + return Arrays.asList(map.get("user"), map.get("timestamp")); } /** * Default only f0, but it requires to be overrode if different - * - * TODO: Handle the schema with KeyValue based structure - * * @return Fields */ @Override public Fields getOutputFields() { - return new Fields(NameConstants.FIELD_PREFIX()+"0"); + return new Fields(NameConstants.FIELD_PREFIX+"0"); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/NameConstants.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/NameConstants.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/NameConstants.java new file mode 100644 index 0000000..cdf6b38 --- /dev/null +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/NameConstants.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.metric.kafka; + +/** + * Since 8/18/16. + */ +public class NameConstants { + public static final String FIELD_PREFIX = "f"; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestDataDistributionDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestDataDistributionDaoImpl.java b/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestDataDistributionDaoImpl.java index 28c2764..21aad05 100644 --- a/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestDataDistributionDaoImpl.java +++ b/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestDataDistributionDaoImpl.java @@ -20,7 +20,7 @@ package org.apache.eagle.metric.kafka; import org.apache.commons.lang3.time.DateUtils; import org.apache.eagle.common.config.EagleConfigConstants; -import org.apache.eagle.partition.DataDistributionDao; +import org.apache.eagle.dataproc.impl.storm.partition.DataDistributionDao; import org.apache.eagle.security.partition.DataDistributionDaoImpl; import org.junit.Ignore; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestGreedyPartition.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestGreedyPartition.java b/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestGreedyPartition.java index 0f7a720..eb4715a 100644 --- a/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestGreedyPartition.java +++ b/eagle-security/eagle-metric-collection/src/test/java/org/apache/eagle/metric/kafka/TestGreedyPartition.java @@ -20,8 +20,8 @@ package org.apache.eagle.metric.kafka; import org.apache.commons.lang3.time.DateUtils; import org.apache.eagle.common.config.EagleConfigConstants; -import org.apache.eagle.partition.DataDistributionDao; -import org.apache.eagle.partition.PartitionAlgorithm; +import org.apache.eagle.dataproc.impl.storm.partition.DataDistributionDao; +import org.apache.eagle.dataproc.impl.storm.partition.PartitionAlgorithm; import org.apache.eagle.security.partition.DataDistributionDaoImpl; import org.apache.eagle.security.partition.GreedyPartitionAlgorithm; import org.junit.Ignore; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-security-common/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-common/pom.xml b/eagle-security/eagle-security-common/pom.xml index bff3897..186c909 100644 --- a/eagle-security/eagle-security-common/pom.xml +++ b/eagle-security/eagle-security-common/pom.xml @@ -33,7 +33,7 @@ org.apache.eagle - eagle-stream-process-api + eagle-data-process ${project.version} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java index 24deb65..23282d2 100644 --- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java +++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java @@ -20,10 +20,10 @@ package org.apache.eagle.security.partition; import com.sun.jersey.api.client.WebResource; +import org.apache.eagle.dataproc.impl.storm.partition.DataDistributionDao; +import org.apache.eagle.dataproc.impl.storm.partition.Weight; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; import org.apache.eagle.metric.MetricConstants; -import org.apache.eagle.partition.DataDistributionDao; -import org.apache.eagle.partition.Weight; import org.apache.eagle.service.client.EagleServiceClientException; import org.apache.eagle.service.client.EagleServiceSingleEntityQueryRequest; import org.apache.eagle.service.client.IEagleServiceClient; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java index 0197393..fa32d55 100644 --- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java +++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java @@ -19,8 +19,8 @@ package org.apache.eagle.security.partition; -import org.apache.eagle.partition.PartitionAlgorithm; -import org.apache.eagle.partition.Weight; +import org.apache.eagle.dataproc.impl.storm.partition.PartitionAlgorithm; +import org.apache.eagle.dataproc.impl.storm.partition.Weight; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java index 3d452b4..bd23643 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java @@ -31,11 +31,7 @@ import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; import org.apache.eagle.app.sink.StormStreamSink; import org.apache.eagle.common.config.EagleConfigConstants; -import org.apache.eagle.dataproc.impl.storm.partition.CustomPartitionGrouping; -import org.apache.eagle.partition.DataDistributionDao; -import org.apache.eagle.partition.PartitionAlgorithm; -import org.apache.eagle.partition.PartitionStrategy; -import org.apache.eagle.partition.PartitionStrategyImpl; +import org.apache.eagle.dataproc.impl.storm.partition.*; import org.apache.eagle.security.partition.DataDistributionDaoImpl; import org.apache.eagle.security.partition.GreedyPartitionAlgorithm; import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AttributeType.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AttributeType.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AttributeType.java new file mode 100644 index 0000000..0585663 --- /dev/null +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AttributeType.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.security.auditlog; + +/** + * @see org.wso2.siddhi.query.api.definition.Attribute.Type + */ +public enum AttributeType { + STRING, + LONG, + INTEGER, + BOOL, + FLOAT, + DOUBLE +// , OBJECT +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java index 2ce0dc1..e06143c 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java @@ -20,7 +20,6 @@ package org.apache.eagle.security.auditlog; import backtype.storm.task.OutputCollector; import com.typesafe.config.Config; -import org.apache.eagle.policy.siddhi.AttributeType; import org.apache.eagle.security.entity.HdfsUserCommandPatternEntity; import org.apache.eagle.service.client.EagleServiceConnector; import org.slf4j.Logger; @@ -189,4 +188,4 @@ public class HdfsUserCommandReassembler { } return siddhiEvent; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b31bac50/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java index 12d0732..812540e 100644 --- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java +++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java @@ -22,7 +22,7 @@ import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; -import org.apache.eagle.dataproc.core.ValuesArray; +import org.apache.eagle.dataproc.impl.storm.ValuesArray; import org.apache.eagle.jpm.util.*; import org.apache.eagle.jpm.util.jobrecover.RunningJobManager; import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;