Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9EB95200C3C for ; Mon, 3 Apr 2017 13:54:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9D558160BB3; Mon, 3 Apr 2017 11:54:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F29D8160BB5 for ; Mon, 3 Apr 2017 13:54:11 +0200 (CEST) Received: (qmail 77417 invoked by uid 500); 3 Apr 2017 11:54:11 -0000 Mailing-List: contact commits-help@eagle.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.apache.org Delivered-To: mailing list commits@eagle.apache.org Received: (qmail 76672 invoked by uid 99); 3 Apr 2017 11:54:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Apr 2017 11:54:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6CF83DFE61; Mon, 3 Apr 2017 11:54:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hao@apache.org To: commits@eagle.apache.org Date: Mon, 03 Apr 2017 11:54:27 -0000 Message-Id: In-Reply-To: <04280246f21e4dcd9fbfe899c4344da2@git.apache.org> References: <04280246f21e4dcd9fbfe899c4344da2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [19/84] [partial] eagle git commit: Clean repo for eagle site archived-at: Mon, 03 Apr 2017 11:54:15 -0000 http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java deleted file mode 100644 index 1036a36..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.spout; - -import backtype.storm.spout.ISpoutOutputCollector; -import backtype.storm.spout.SpoutOutputCollector; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.eagle.alert.coordination.model.SpoutSpec; -import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata; -import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy; -import org.apache.eagle.alert.coordination.model.Tuple2StreamConverter; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer; -import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider; -import org.apache.eagle.alert.utils.StreamIdConversion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -/** - * intercept the message sent from within KafkaSpout and select downstream bolts based on meta-data - * This is topic based. each topic will have one SpoutOutputCollectorWrapper. - */ -public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements ISpoutSpecLCM, SerializationMetadataProvider { - private static final Logger LOG = LoggerFactory.getLogger(SpoutOutputCollectorWrapper.class); - - private final ISpoutOutputCollector delegate; - private final String topic; - private final PartitionedEventSerializer serializer; - private int numOfRouterBolts; - private boolean logEventEnabled; - - private volatile List streamRepartitionMetadataList; - private volatile Tuple2StreamConverter converter; - private CorrelationSpout spout; - private volatile Map sds; - - /** - * @param delegate actual SpoutOutputCollector to send data to following bolts - * @param topic topic for this KafkaSpout to handle - * @param numGroupbyBolts bolts following this spout. - * @param serializer - */ - public SpoutOutputCollectorWrapper(CorrelationSpout spout, - ISpoutOutputCollector delegate, - String topic, - SpoutSpec spoutSpec, - int numGroupbyBolts, - Map sds, PartitionedEventSerializer serializer, boolean logEventEnabled) { - super(delegate); - this.spout = spout; - this.delegate = delegate; - this.topic = topic; - this.streamRepartitionMetadataList = spoutSpec.getStreamRepartitionMetadataMap().get(topic); - this.converter = new Tuple2StreamConverter(spoutSpec.getTuple2StreamMetadataMap().get(topic)); - this.numOfRouterBolts = numGroupbyBolts; - this.sds = sds; - this.serializer = serializer; - this.logEventEnabled = logEventEnabled; - } - - /** - * How to assert that numTotalGroupbyBolts >= numOfRouterBolts, otherwise - * there is runtime issue by default, tuple includes 2 fields field 1: topic - * name field 2: map of key/value. - */ - @SuppressWarnings("rawtypes") - @Override - public List emit(List tuple, Object messageId) { - if (!sanityCheck()) { - LOG.error( - "spout collector for topic {} see monitored metadata invalid, is this data source removed! Trigger message id {} ", - topic, messageId); - return null; - } - - KafkaMessageIdWrapper newMessageId = new KafkaMessageIdWrapper(messageId); - newMessageId.topic = topic; - newMessageId.timestamp = System.currentTimeMillis(); - /** - phase 1: tuple to stream converter - if this topic multiplexes multiple streams, then retrieve the individual streams. - */ - List convertedTuple = converter.convert(tuple); - if (convertedTuple == null) { - LOG.debug("source data {} can't be converted to a stream, ignore this message", tuple); - spout.ack(newMessageId); - return null; - } - Map m = (Map) convertedTuple.get(3); - Object streamId = convertedTuple.get(1); - - StreamDefinition sd = sds.get(streamId); - if (sd == null) { - LOG.warn("StreamDefinition {} is not found within {}, ignore this message", streamId, sds); - spout.ack(newMessageId); - return null; - } - - StreamEvent event = convertToStreamEventByStreamDefinition((Long) convertedTuple.get(2), m, sds.get(streamId)); - if (logEventEnabled) { - LOG.info("Spout from topic {} emit event: {}", topic, event); - } - - /* - phase 2: stream repartition - */ - for (StreamRepartitionMetadata md : streamRepartitionMetadataList) { - if (!event.getStreamId().equals(md.getStreamId())) { - continue; - } - // one stream may have multiple group-by strategies, each strategy is for a specific group-by - for (StreamRepartitionStrategy groupingStrategy : md.groupingStrategies) { - int hash = 0; - if (groupingStrategy.getPartition().getType().equals(StreamPartition.Type.GROUPBY)) { - hash = getRoutingHashByGroupingStrategy(m, groupingStrategy); - } else if (groupingStrategy.getPartition().getType().equals(StreamPartition.Type.SHUFFLE)) { - hash = Math.abs((int) System.currentTimeMillis()); - } - int mod = hash % groupingStrategy.numTotalParticipatingRouterBolts; - // filter out message - if (mod >= groupingStrategy.startSequence && mod < groupingStrategy.startSequence + numOfRouterBolts) { - // framework takes care of field grouping instead of using storm internal field grouping - String sid = StreamIdConversion.generateStreamIdBetween(spout.getSpoutName(), spout.getRouteBoltName() + (hash % numOfRouterBolts)); - if (LOG.isDebugEnabled()) { - LOG.debug("Emitted tuple: {} with message Id: {}, with topic {}, to streamId {}", convertedTuple, messageId, topic, sid); - } - // send message to StreamRouterBolt - PartitionedEvent pEvent = new PartitionedEvent(event, groupingStrategy.partition, hash); - if (this.serializer == null) { - delegate.emit(sid, Collections.singletonList(pEvent), newMessageId); - } else { - try { - delegate.emit(sid, Collections.singletonList(serializer.serialize(pEvent)), newMessageId); - } catch (Exception e) { - LOG.error("Failed to serialize {}, this message would be ignored!", pEvent, e); - spout.ack(newMessageId); - } - } - } else { - // ******* short-cut ack ******** - // we should simply ack those messages which are not processed in this topology because KafkaSpout implementation requires _pending is empty - // before moving to next offsets. - if (LOG.isDebugEnabled()) { - LOG.debug("Message filtered with mod {} not within range {} and {} for message {}", mod, groupingStrategy.startSequence, - groupingStrategy.startSequence + numOfRouterBolts, tuple); - } - spout.ack(newMessageId); - } - } - } - - return null; - } - - @SuppressWarnings("rawtypes") - private int getRoutingHashByGroupingStrategy(Map data, StreamRepartitionStrategy gs) { - // calculate hash value for values from group-by fields - HashCodeBuilder hashCodeBuilder = new HashCodeBuilder(); - for (String groupingField : gs.partition.getColumns()) { - if (data.get(groupingField) != null) { - hashCodeBuilder.append(data.get(groupingField)); - } else { - LOG.warn("Required GroupBy fields {} not found: {}", gs.partition.getColumns(), data); - } - } - int hash = hashCodeBuilder.toHashCode(); - hash = Math.abs(hash); - return hash; - } - - private boolean sanityCheck() { - boolean isOk = true; - if (streamRepartitionMetadataList == null) { - LOG.error("streamRepartitionMetadataList is null!"); - isOk = false; - } - if (converter == null) { - LOG.error("tuple2StreamMetadata is null!"); - isOk = false; - } - return isOk; - } - - @SuppressWarnings( {"rawtypes", "unchecked"}) - private StreamEvent convertToStreamEventByStreamDefinition(long timestamp, Map m, StreamDefinition sd) { - return StreamEvent.builder().timestamep(timestamp).attributes(m, sd).build(); - } - - /** - * SpoutSpec may be changed, this class will respond to changes on tuple2StreamMetadataMap and streamRepartitionMetadataMap. - * - * @param spoutSpec - * @param sds - */ - @Override - public void update(SpoutSpec spoutSpec, Map sds) { - this.streamRepartitionMetadataList = spoutSpec.getStreamRepartitionMetadataMap().get(topic); - this.converter = new Tuple2StreamConverter(spoutSpec.getTuple2StreamMetadataMap().get(topic)); - this.sds = sds; - } - - @Override - public StreamDefinition getStreamDefinition(String streamId) { - return this.sds.get(streamId); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java deleted file mode 100644 index fc9cc8a..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.utils; - -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; - -import java.util.Map; - -/** - * Created on 8/16/16. - */ -public class AlertStreamUtils { - - /** - * Create alert stream event for publisher. - */ - public static AlertStreamEvent createAlertEvent(StreamEvent event, - PolicyHandlerContext context, - Map sds) { - PolicyDefinition policyDef = context.getPolicyDefinition(); - AlertStreamEvent alertStreamEvent = new AlertStreamEvent(); - - alertStreamEvent.setTimestamp(event.getTimestamp()); - alertStreamEvent.setData(event.getData()); - alertStreamEvent.setStreamId(policyDef.getOutputStreams().get(0)); - alertStreamEvent.setPolicyId(policyDef.getName()); - - if (context.getPolicyEvaluator() != null) { - alertStreamEvent.setCreatedBy(context.getPolicyEvaluator().getName()); - } - - alertStreamEvent.setCreatedTime(System.currentTimeMillis()); - - String is = policyDef.getInputStreams().get(0); - StreamDefinition sd = sds.get(is); - alertStreamEvent.setSchema(sd); - - return alertStreamEvent; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java deleted file mode 100644 index 075d827..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.utils; - -import com.google.common.io.ByteStreams; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - - -public class CompressionUtils { - public static byte[] compress(byte[] source) throws IOException { - if (source == null || source.length == 0) { - return source; - } - ByteArrayInputStream sourceStream = new ByteArrayInputStream(source); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(source.length / 2); - try (OutputStream compressor = new GZIPOutputStream(outputStream)) { - ByteStreams.copy(sourceStream, compressor); - compressor.close(); - } - try { - return outputStream.toByteArray(); - } finally { - sourceStream.close(); - outputStream.close(); - } - } - - public static byte[] decompress(byte[] compressed) throws IOException { - if (compressed == null || compressed.length == 0) { - return compressed; - } - ByteArrayInputStream sourceStream = new ByteArrayInputStream(compressed); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(compressed.length * 2); - try (GZIPInputStream compressor = new GZIPInputStream(sourceStream)) { - ByteStreams.copy(compressor, outputStream); - compressor.close(); - } - try { - return outputStream.toByteArray(); - } finally { - sourceStream.close(); - outputStream.close(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java deleted file mode 100644 index 2229219..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.utils; - -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.InputStream; - -/** - * Since 5/6/16. - */ -public class MetadataSerDeser { - private static final Logger LOG = LoggerFactory.getLogger(MetadataSerDeser.class); - - @SuppressWarnings("rawtypes") - public static K deserialize(InputStream is, TypeReference typeRef) { - ObjectMapper mapper = new ObjectMapper(); - try { - K spec = mapper.readValue(is, typeRef); - return spec; - } catch (Exception ex) { - LOG.error("error in deserializing metadata of type {} from input stream", - new TypeReference() { - }.getType().getClass().getCanonicalName(), ex); - } - return null; - } - - public static K deserialize(InputStream is, Class cls) { - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true); - try { - K spec = mapper.readValue(is, cls); - return spec; - } catch (Exception ex) { - LOG.error("Got error to deserialize metadata of type {} from input stream", - new TypeReference() { - }.getType().getClass().getCanonicalName(), ex); - } - return null; - } - - @SuppressWarnings("rawtypes") - public static K deserialize(String json, TypeReference typeRef) { - ObjectMapper mapper = new ObjectMapper(); - try { - K spec = mapper.readValue(json, typeRef); - return spec; - } catch (Exception ex) { - LOG.error("error in deserializing metadata of type {} from {}", - new TypeReference() { - }.getType().getClass().getCanonicalName(), json, ex); - } - return null; - } - - public static K deserialize(String json, Class cls) { - ObjectMapper mapper = new ObjectMapper(); - try { - K spec = mapper.readValue(json, cls); - return spec; - } catch (Exception ex) { - LOG.error("error in deserializing metadata of type {} from {}", - new TypeReference() { - }.getType().getClass().getCanonicalName(), json, ex); - } - return null; - } - - public static String serialize(K spec) { - ObjectMapper mapper = new ObjectMapper(); - try { - String json = mapper.writeValueAsString(spec); - return json; - } catch (Exception ex) { - LOG.error("error in serializing object {} with type {}", spec, - new TypeReference() { - }.getType().getClass().getCanonicalName(), ex); - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java deleted file mode 100644 index 509cbf4..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.utils; - -import org.xerial.snappy.SnappyInputStream; -import org.xerial.snappy.SnappyOutputStream; - -import java.io.*; - - -/** - * Utilities for working with Serializables. - * Derived from "com.google.cloud.dataflow.sdk.util.SerializableUtils" - */ -public class SerializableUtils { - /** - * Serializes the argument into an array of bytes, and returns it. - * - * @throws IllegalArgumentException if there are errors when serializing - */ - public static byte[] serializeToCompressedByteArray(Object value) { - try { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - try (ObjectOutputStream oos = new ObjectOutputStream(new SnappyOutputStream(buffer))) { - oos.writeObject(value); - } - return buffer.toByteArray(); - } catch (IOException exn) { - throw new IllegalArgumentException( - "unable to serialize " + value, - exn); - } - } - - /** - * Serializes the argument into an array of bytes, and returns it. - * - * @throws IllegalArgumentException if there are errors when serializing - */ - public static byte[] serializeToByteArray(Object value) { - try { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - try (ObjectOutputStream oos = new ObjectOutputStream(buffer)) { - oos.writeObject(value); - } - return buffer.toByteArray(); - } catch (IOException exn) { - throw new IllegalArgumentException("unable to serialize " + value, exn); - } - } - - /** - * Deserializes an object from the given array of bytes, e.g., as - * serialized using {@link #serializeToCompressedByteArray}, and returns it. - * - * @throws IllegalArgumentException if there are errors when - * deserializing, using the provided description to identify what - * was being deserialized - */ - public static Object deserializeFromByteArray(byte[] encodedValue, - String description) { - try { - try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(encodedValue))) { - return ois.readObject(); - } - } catch (IOException | ClassNotFoundException exn) { - throw new IllegalArgumentException( - "unable to deserialize " + description, - exn); - } - } - - /** - * Deserializes an object from the given array of bytes, e.g., as - * serialized using {@link #serializeToCompressedByteArray}, and returns it. - * - * @throws IllegalArgumentException if there are errors when - * deserializing, using the provided description to identify what - * was being deserialized - */ - public static Object deserializeFromCompressedByteArray(byte[] encodedValue, - String description) { - try { - try (ObjectInputStream ois = new ObjectInputStream( - new SnappyInputStream(new ByteArrayInputStream(encodedValue)))) { - return ois.readObject(); - } - } catch (IOException | ClassNotFoundException exn) { - throw new IllegalArgumentException( - "unable to deserialize " + description, - exn); - } - } - - public static T ensureSerializable(T value) { - @SuppressWarnings("unchecked") - T copy = (T) deserializeFromCompressedByteArray(serializeToCompressedByteArray(value), - value.toString()); - return copy; - } - - public static T clone(T value) { - @SuppressWarnings("unchecked") - T copy = (T) deserializeFromCompressedByteArray(serializeToCompressedByteArray(value), - value.toString()); - return copy; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SingletonExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SingletonExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SingletonExecutor.java deleted file mode 100644 index fdb9ed8..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/SingletonExecutor.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.utils; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -public class SingletonExecutor { - - public static synchronized ExecutorService getExecutorService() { - return executorService; - } - - private static ExecutorService executorService; - - private static SingletonExecutor executor = new SingletonExecutor(); - - public SingletonExecutor() { - executorService = Executors.newFixedThreadPool(5); - } - - public static void main(String[] args) { - System.out.println(SingletonExecutor.getExecutorService()); - System.out.println(SingletonExecutor.getExecutorService()); - - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutMetric.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutMetric.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutMetric.java deleted file mode 100644 index 935742f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutMetric.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.metric.api.IMetric; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Since 5/18/16. - * The original storm.kafka.KafkaSpout has some issues like the following - * 1) can only support one single topic - * 2) can only be initialized at open(), can't dynamically support another topic. - */ -public class KafkaSpoutMetric implements IMetric { - @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutMetric.class); - private Map metricContextMap = new ConcurrentHashMap<>(); - private Map offsetMetricMap = new ConcurrentHashMap<>(); - - public static class KafkaSpoutMetricContext { - SpoutConfig spoutConfig; - DynamicPartitionConnections connections; - PartitionCoordinator coordinator; - } - - public void addTopic(String topic, KafkaSpoutMetricContext context) { - // construct KafkaOffsetMetric - KafkaUtils.KafkaOffsetMetric kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(context.spoutConfig.topic, context.connections); - metricContextMap.put(topic, context); - offsetMetricMap.put(topic, kafkaOffsetMetric); - } - - public void removeTopic(String topic) { - metricContextMap.remove(topic); - offsetMetricMap.remove(topic); - } - - @SuppressWarnings( {"unchecked", "rawtypes"}) - @Override - public Object getValueAndReset() { - HashMap spoutMetric = new HashMap(); - for (Map.Entry entry : metricContextMap.entrySet()) { - // construct offset metric - List pms = entry.getValue().coordinator.getMyManagedPartitions(); - Set latestPartitions = new HashSet(); - for (PartitionManager pm : pms) { - latestPartitions.add(pm.getPartition()); - } - - KafkaUtils.KafkaOffsetMetric offsetMetric = offsetMetricMap.get(entry.getKey()); - offsetMetric.refreshPartitions(latestPartitions); - for (PartitionManager pm : pms) { - offsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset()); - } - Object o = offsetMetric.getValueAndReset(); - if (o != null) { - ((HashMap) o).forEach( - (k, v) -> spoutMetric.put(k + "_" + entry.getKey(), v) - ); - } - - // construct partition metric - for (PartitionManager pm : pms) { - pm.getMetricsDataMap().forEach( - (k, v) -> spoutMetric.put(k + "_" + entry.getKey(), v) - ); - } - } - return spoutMetric; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutWrapper.java deleted file mode 100644 index 2bdbd3c..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/storm/kafka/KafkaSpoutWrapper.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package storm.kafka; - -import org.apache.eagle.alert.coordination.model.SpoutSpec; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.spout.ISpoutSpecLCM; -import org.apache.eagle.alert.engine.spout.SpoutOutputCollectorWrapper; -import backtype.storm.Config; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * NOTE!!!!! This class copy/paste some code from storm.kafka.KafkaSpout to make sure it can support one process to hold multiple - * KafkaSpout - * - *

this collectorWrapper provides the following capabilities: - * 1. inject customized collector collectorWrapper, so framework can control traffic routing - * 2. listen to topic to stream metadata change and pass that to customized collector collectorWrapper - * 3. return current streams for this topic

- */ -public class KafkaSpoutWrapper extends KafkaSpout implements ISpoutSpecLCM { - private static final long serialVersionUID = 5507693757424351306L; - @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutWrapper.class); - private KafkaSpoutMetric kafkaSpoutMetric; - - public KafkaSpoutWrapper(SpoutConfig spoutConf, KafkaSpoutMetric kafkaSpoutMetric) { - super(spoutConf); - this.kafkaSpoutMetric = kafkaSpoutMetric; - } - - private SpoutOutputCollectorWrapper collectorWrapper; - - @SuppressWarnings( {"unchecked", "rawtypes"}) - @Override - public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) { - String topologyInstanceId = context.getStormId(); - ////// !!!! begin copying code from storm.kafka.KafkaSpout to here - _collector = collector; - - Map stateConf = new HashMap(conf); - List zkServers = _spoutConfig.zkServers; - if (zkServers == null) { - zkServers = (List) conf.get(Config.STORM_ZOOKEEPER_SERVERS); - } - Integer zkPort = _spoutConfig.zkPort; - if (zkPort == null) { - zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); - } - stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers); - stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort); - stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot); - _state = new ZkState(stateConf); - - _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig)); - - // using TransactionalState like this is a hack - int totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); - if (_spoutConfig.hosts instanceof StaticHosts) { - _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, topologyInstanceId); - } else { - _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, topologyInstanceId); - } - - ////// !!!! end copying code from storm.kafka.KafkaSpout to here - - // add new topic to metric - KafkaSpoutMetric.KafkaSpoutMetricContext metricContext = new KafkaSpoutMetric.KafkaSpoutMetricContext(); - metricContext.connections = _connections; - metricContext.coordinator = _coordinator; - metricContext.spoutConfig = _spoutConfig; - kafkaSpoutMetric.addTopic(_spoutConfig.topic, metricContext); - - this.collectorWrapper = (SpoutOutputCollectorWrapper) collector; - } - - @Override - public void update(SpoutSpec metadata, Map sds) { - collectorWrapper.update(metadata, sds); - } - - @Override - public void close() { - super.close(); - kafkaSpoutMetric.removeTopic(_spoutConfig.topic); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT_TEMPLATE.vm ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT_TEMPLATE.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT_TEMPLATE.vm deleted file mode 100644 index 3926cc8..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT_TEMPLATE.vm +++ /dev/null @@ -1,301 +0,0 @@ - - - - #set ( $alert = $alertList[0] ) - - - - $alert["alertSubject"] - - - - - - - - - - -
- - - - - -
-

Eagle Alert Notification

-
-
- - - - - - - - - - - - - - - - - - - - - - - - - - - -## -## -## -## - - - - - - - - - - - - - - - - - -
-

Warning: $alert["alertSubject"]

-
- - - - #set ( $alertSeverity = $alert["alertSeverity"] ) - #if (!$alertSeverity || ("$alertSeverity" == "")) - #set ( $alert["alertSeverity"] = "WARNING") - #end - - -
-

Detected Time: $alert["alertTime"]

-
-

- Severity: - #if ($alert["alertSeverity"] == "WARNING") - $alert["alertSeverity"] - #else - $alert["alertSeverity"] - #end -

-
-
-

Alert Message

-
-

$alert["alertBody"]

-
-

Alert Detail

-
- - - - - - - - - - - - - - - - - - - - - -
-

Policy Name

-
-

$alert["policyId"]

-
-

Severity Level

-
-

$alert["alertSeverity"]

-
-

Alert Stream

-
-

$alert["streamId"]

-
-

Created Time

-
-

$alert["alertTime"]

-
-

Created By

-
-

$alert["creator"]

-
-
-## -## -## -## -##
-## View Policy Details -##
-##
- - - - -
- View Alert on Eagle -
-
-

Actions Required

-
-

- The alert notification was automatically detected and sent by Eagle according to policy: $alert["policyId"]. - To follow-up on this, please verify the alert and diagnose the root cause with Eagle: -

-

- -
-

Powered by Apache Eagle (version: $alert["version"])

-
-
- - \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm deleted file mode 100644 index 0e3d5fe..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm +++ /dev/null @@ -1,259 +0,0 @@ - - - -#set ( $alert = $alertList[0] ) - -## Generate Alert Color -#set($alertColor = "#337ab7") -#if($alert["alertSeverity"] == "WARNING") - #set($alertColor = "#FF9F00") -#elseif($alert["alertSeverity"] == "CRITICAL" || $alert["alertSeverity"] == "FETAL") - #set($alertColor = "#d43f3a") -#elseif ($alert["alertSeverity"] == "OK") - #set($alertColor = "#68B90F") -#end - - - - - - [$alert["alertSeverity"] $alert["alertSubject"] - - - - - - - - - - - -
-
- - - - - - - -
- $alert["alertSeverity"]: - $alert["alertSubject"] -
- - - - - - - - - - - - - - - -
- CATEGORY: #if($alert["alertCategory"]) $alert["alertCategory"] #else N/A #end TIME: $alert["alertTime"] -
-
- $alert["alertBody"] -
-
- - - - -
- - - - - - - - - - - - - - - - - -
- Severity - $alert["alertSeverity"] -
Category - - #if($alert["alertCategory"]) - $alert["alertCategory"] - #else - N/A - #end -
Cluster - $alert["siteId"] -
Policy - $alert[ - "policyId"] -
-
-
- - View Alert Details - -
-
- -
-
- - http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm deleted file mode 100644 index f273917..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm +++ /dev/null @@ -1,495 +0,0 @@ - - - - - #set ( $alert = $alertList[0] ) - - - - [$alert["alertSeverity"]$alert["alertSubject"] - - - - - - - - - - - -
-
- - - - - - - -
- $alert["alertSeverity"] $alert["alertSubject"] -
- - - - - - - - - - - - - - - - -
- - - - - - - - - -
Category:$alert["alertCategory"]Time:$alert["alertTime"]
-
-
- $alert["alertBody"] -
-
- - - - - - - - - - - -
Cluster$alert["siteId"]
Policy$alert["policyId"]
-
- View Alert Details -
- Note: The alert was automatically detected by Eagle. -
-
- -
-
- -