eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject eagle git commit: [MINOR] Fix metadata updating bug by impl equals/hashCode
Date Mon, 13 Mar 2017 09:15:56 GMT
Repository: eagle
Updated Branches:
  refs/heads/master a71a36bf4 -> 7e41ed0b9


[MINOR] Fix metadata updating bug by impl equals/hashCode

Fix metadata updating bug by impl equals/hashCode

Author: Hao Chen <hao@apache.org>

Closes #868 from haoch/FixMetadataUpdate.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/7e41ed0b
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/7e41ed0b
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/7e41ed0b

Branch: refs/heads/master
Commit: 7e41ed0b9d9c6bcbd5fef156ae5ca1cdc7e16c28
Parents: a71a36b
Author: Hao Chen <hao@apache.org>
Authored: Mon Mar 13 17:15:39 2017 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Mon Mar 13 17:15:39 2017 +0800

----------------------------------------------------------------------
 .../app/messaging/KafkaStreamSinkConfig.java    | 52 +++++++++++++++-
 .../app/messaging/KafkaStreamSourceConfig.java  | 65 ++++++++++++++++++++
 .../impl/StreamMetadataUpdateServiceImpl.java   |  9 +--
 .../apache/eagle/metadata/model/StreamDesc.java | 35 ++++++++++-
 4 files changed, 154 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/7e41ed0b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java
index 4a72709..bdc4f53 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSinkConfig.java
@@ -16,8 +16,11 @@
  */
 package org.apache.eagle.app.messaging;
 
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.eagle.metadata.model.StreamSinkConfig;
 
+import java.util.Objects;
+
 /**
  * FIXME Rename to KafkaStreamMessagingConfig.
  */
@@ -111,5 +114,52 @@ public class KafkaStreamSinkConfig implements StreamSinkConfig {
         return KafkaStreamSinkConfig.class;
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof KafkaStreamSinkConfig)) {
+            return false;
+        }
+
+        KafkaStreamSinkConfig config = (KafkaStreamSinkConfig) o;
+
+        if (!getTopicId().equals(config.getTopicId())) {
+            return false;
+        }
+        if (getBrokerList() != null ? !getBrokerList().equals(config.getBrokerList()) : config.getBrokerList()
!= null) {
+            return false;
+        }
+        if (getSerializerClass() != null ? !getSerializerClass().equals(config.getSerializerClass())
: config.getSerializerClass() != null) {
+            return false;
+        }
+        if (getKeySerializerClass() != null ? !getKeySerializerClass().equals(config.getKeySerializerClass())
: config.getKeySerializerClass() != null) {
+            return false;
+        }
+        if (getNumBatchMessages() != null ? !getNumBatchMessages().equals(config.getNumBatchMessages())
: config.getNumBatchMessages() != null) {
+            return false;
+        }
+        if (getMaxQueueBufferMs() != null ? !getMaxQueueBufferMs().equals(config.getMaxQueueBufferMs())
: config.getMaxQueueBufferMs() != null) {
+            return false;
+        }
+        if (getProducerType() != null ? !getProducerType().equals(config.getProducerType())
: config.getProducerType() != null) {
+            return false;
+        }
+        return getRequestRequiredAcks() != null ? getRequestRequiredAcks().equals(config.getRequestRequiredAcks())
: config.getRequestRequiredAcks() == null;
 
-}
+    }
+
+    @Override
+    public int hashCode() {
+        int result = getTopicId().hashCode();
+        result = 31 * result + (getBrokerList() != null ? getBrokerList().hashCode() : 0);
+        result = 31 * result + (getSerializerClass() != null ? getSerializerClass().hashCode()
: 0);
+        result = 31 * result + (getKeySerializerClass() != null ? getKeySerializerClass().hashCode()
: 0);
+        result = 31 * result + (getNumBatchMessages() != null ? getNumBatchMessages().hashCode()
: 0);
+        result = 31 * result + (getMaxQueueBufferMs() != null ? getMaxQueueBufferMs().hashCode()
: 0);
+        result = 31 * result + (getProducerType() != null ? getProducerType().hashCode()
: 0);
+        result = 31 * result + (getRequestRequiredAcks() != null ? getRequestRequiredAcks().hashCode()
: 0);
+        return result;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/7e41ed0b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
index e6fdb83..d0a91da 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
@@ -148,4 +148,69 @@ public class KafkaStreamSourceConfig implements StreamSourceConfig {
     public void setTopicId(String topicId) {
         this.topicId = topicId;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof KafkaStreamSourceConfig)) {
+            return false;
+        }
+
+        KafkaStreamSourceConfig that = (KafkaStreamSourceConfig) o;
+
+        if (getFetchSize() != that.getFetchSize()) {
+            return false;
+        }
+        if (getTransactionStateUpdateMS() != that.getTransactionStateUpdateMS()) {
+            return false;
+        }
+        if (getStartOffsetTime() != that.getStartOffsetTime()) {
+            return false;
+        }
+        if (isForceFromStart() != that.isForceFromStart()) {
+            return false;
+        }
+        if (getTopicId() != null ? !getTopicId().equals(that.getTopicId()) : that.getTopicId()
!= null) {
+            return false;
+        }
+        if (getBrokerZkQuorum() != null ? !getBrokerZkQuorum().equals(that.getBrokerZkQuorum())
: that.getBrokerZkQuorum() != null) {
+            return false;
+        }
+        if (getBrokerZkBasePath() != null ? !getBrokerZkBasePath().equals(that.getBrokerZkBasePath())
: that.getBrokerZkBasePath() != null) {
+            return false;
+        }
+        if (getTransactionZkServers() != null ? !getTransactionZkServers().equals(that.getTransactionZkServers())
: that.getTransactionZkServers() != null) {
+            return false;
+        }
+        if (getTransactionZKRoot() != null ? !getTransactionZKRoot().equals(that.getTransactionZKRoot())
: that.getTransactionZKRoot() != null) {
+            return false;
+        }
+        if (getConsumerGroupId() != null ? !getConsumerGroupId().equals(that.getConsumerGroupId())
: that.getConsumerGroupId() != null) {
+            return false;
+        }
+        if (getBrokerZkPath() != null ? !getBrokerZkPath().equals(that.getBrokerZkPath())
: that.getBrokerZkPath() != null) {
+            return false;
+        }
+        return getSchemaClass() != null ? getSchemaClass().equals(that.getSchemaClass())
: that.getSchemaClass() == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = getTopicId() != null ? getTopicId().hashCode() : 0;
+        result = 31 * result + (getBrokerZkQuorum() != null ? getBrokerZkQuorum().hashCode()
: 0);
+        result = 31 * result + (getBrokerZkBasePath() != null ? getBrokerZkBasePath().hashCode()
: 0);
+        result = 31 * result + (getTransactionZkServers() != null ? getTransactionZkServers().hashCode()
: 0);
+        result = 31 * result + getFetchSize();
+        result = 31 * result + (getTransactionZKRoot() != null ? getTransactionZKRoot().hashCode()
: 0);
+        result = 31 * result + (getConsumerGroupId() != null ? getConsumerGroupId().hashCode()
: 0);
+        result = 31 * result + (getBrokerZkPath() != null ? getBrokerZkPath().hashCode()
: 0);
+        result = 31 * result + (int) (getTransactionStateUpdateMS() ^ (getTransactionStateUpdateMS()
>>> 32));
+        result = 31 * result + getStartOffsetTime();
+        result = 31 * result + (isForceFromStart() ? 1 : 0);
+        result = 31 * result + (getSchemaClass() != null ? getSchemaClass().hashCode() :
0);
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/7e41ed0b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java
b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java
index 10bad33..274f0da 100644
--- a/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java
+++ b/eagle-core/eagle-app/eagle-app-streamproxy/src/main/java/org/apache/eagle/app/proxy/stream/impl/StreamMetadataUpdateServiceImpl.java
@@ -76,17 +76,18 @@ class StreamMetadataUpdateServiceImpl implements StreamMetadataUpdateService
{
                 List<StreamDesc> streamDescList = appEntity.getStreams();
                 if (streamDescList != null && streamDescList.size() > 0) {
                     for (StreamDesc streamDesc : streamDescList) {
+                        total++;
                         latestStreamIdDescMap.put(streamDesc.getStreamId(), streamDesc);
-                        if (streamIdDescMap.containsKey(streamDesc.getStreamId()) &&
!Objects.equals(streamDesc, streamIdDescMap.get(streamDesc.getStreamId()))) {
-                            this.listener.onStreamChanged(streamDesc);
+                        if (streamIdDescMap.containsKey(streamDesc.getStreamId())
+                            && !streamDesc.equals(streamIdDescMap.get(streamDesc.getStreamId())))
{
                             changed++;
+                            this.listener.onStreamChanged(streamDesc);
                         } else if (!streamIdDescMap.containsKey(streamDesc.getStreamId()))
{
                             added++;
                             this.listener.onStreamAdded(streamDesc);
                         }
                     }
                 }
-                total++;
             }
 
             for (String streamId : streamIdDescMap.keySet()) {
@@ -95,12 +96,12 @@ class StreamMetadataUpdateServiceImpl implements StreamMetadataUpdateService
{
                     this.listener.onStreamRemoved(streamIdDescMap.get(streamId));
                 }
             }
-            this.streamIdDescMap = latestStreamIdDescMap;
             if (added > 0 || changed > 0 || removed > 0) {
                 LOGGER.info("Loaded stream metadata: total = {}, added = {}, changed = {},
removed = {}", total, added, changed, removed);
             } else {
                 LOGGER.debug("Loaded stream metadata: total = {}, added = {}, changed = {},
removed = {}", total, added, changed, removed);
             }
+            this.streamIdDescMap = latestStreamIdDescMap;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/7e41ed0b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
index a183351..a5d5ca8 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamDesc.java
@@ -19,8 +19,6 @@ package org.apache.eagle.metadata.model;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.metadata.utils.StreamIdConversions;
 
-import javax.xml.transform.stream.StreamSource;
-
 public class StreamDesc {
     private String streamId;
     private StreamDefinition schema;
@@ -58,4 +56,37 @@ public class StreamDesc {
     public void setSourceConfig(StreamSourceConfig sourceConfig) {
         this.sourceConfig = sourceConfig;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof StreamDesc)) {
+            return false;
+        }
+
+        StreamDesc that = (StreamDesc) o;
+
+        if (!getStreamId().equals(that.getStreamId())) {
+            return false;
+        }
+        if (getSchema() != null ? !getSchema().equals(that.getSchema()) : that.getSchema()
!= null) {
+            return false;
+        }
+        if (getSinkConfig() != null ? !getSinkConfig().equals(that.getSinkConfig()) : that.getSinkConfig()
!= null) {
+            return false;
+        }
+        return getSourceConfig() != null ? getSourceConfig().equals(that.getSourceConfig())
: that.getSourceConfig() == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = getStreamId().hashCode();
+        result = 31 * result + (getSchema() != null ? getSchema().hashCode() : 0);
+        result = 31 * result + (getSinkConfig() != null ? getSinkConfig().hashCode() : 0);
+        result = 31 * result + (getSourceConfig() != null ? getSourceConfig().hashCode()
: 0);
+        return result;
+    }
 }
\ No newline at end of file


Mime
View raw message