activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ppatierno <...@git.apache.org>
Subject [GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk...
Date Tue, 24 Oct 2017 14:37:35 GMT
Github user ppatierno commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1607#discussion_r146578570
  
    --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java
---
    @@ -0,0 +1,484 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.integration.kafka.bridge;
    +
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.SimpleString;
    +import org.apache.activemq.artemis.core.filter.Filter;
    +import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
    +import org.apache.activemq.artemis.core.persistence.StorageManager;
    +import org.apache.activemq.artemis.core.postoffice.Binding;
    +import org.apache.activemq.artemis.core.postoffice.PostOffice;
    +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
    +import org.apache.activemq.artemis.core.server.ConnectorService;
    +import org.apache.activemq.artemis.core.server.Consumer;
    +import org.apache.activemq.artemis.core.server.HandleStatus;
    +import org.apache.activemq.artemis.core.server.MessageReference;
    +import org.apache.activemq.artemis.core.server.Queue;
    +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer;
    +import org.apache.activemq.artemis.utils.ConfigurationHelper;
    +import org.apache.activemq.artemis.utils.ReusableLatch;
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.kafka.common.utils.Utils;
    +
    +class KafkaProducerBridge implements Consumer, ConnectorService {
    +
    +   private final String connectorName;
    +
    +   private final String queueName;
    +
    +   private final String topicName;
    +
    +   private final PostOffice postOffice;
    +
    +   private Queue queue = null;
    +
    +   private Filter filter = null;
    +
    +   private String filterString;
    +
    +   private AtomicBoolean isStarted = new AtomicBoolean();
    +
    +   private boolean isConnected = false;
    +
    +   private Producer<String, Message> kafkaProducer;
    +
    +   private Map<String, Object> configuration;
    +
    +   private long sequentialID;
    +
    +   private final ReusableLatch pendingAcks = new ReusableLatch(0);
    +
    +   private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
    +
    +   private final ScheduledExecutorService scheduledExecutorService;
    +
    +   private final int retryAttempts;
    +
    +   private final long retryInterval;
    +
    +   private final double retryMultiplier;
    +
    +   private final long retryMaxInterval;
    +
    +   private final AtomicInteger retryCount = new AtomicInteger();
    +
    +   private final AtomicBoolean awaitingReconnect = new AtomicBoolean();
    +
    +   private final KafkaProducerFactory<String, Message> kafkaProducerFactory;
    +
    +
    +   KafkaProducerBridge(String connectorName, KafkaProducerFactory<String, Message>
kafkaProducerFactory, Map<String, Object> configuration, StorageManager storageManager,
PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
    +      this.sequentialID = storageManager.generateID();
    +
    +      this.kafkaProducerFactory = kafkaProducerFactory;
    +
    +      this.connectorName = connectorName;
    +      this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME,
null, configuration);
    +      this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME,
null, configuration);
    +
    +      this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME,
-1, configuration);
    +      this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME,
2000, configuration);
    +      this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME,
2, configuration);
    +      this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME,
30000, configuration);
    +
    +      this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING,
null, configuration);
    +
    +      configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    +      if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +         configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName());
    +      }
    +
    +      this.postOffice = postOffice;
    +      this.configuration = configuration;
    +      this.scheduledExecutorService = scheduledExecutorService;
    +   }
    +
    +   private Map<String, Object> kafkaConfig(Map<String, Object> configuration)
{
    +      Map<String, Object> filteredConfig = new HashMap<>(configuration);
    +      KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove);
    +      return filteredConfig;
    +   }
    +
    +   @Override
    +   public void start() throws Exception {
    +      synchronized (this) {
    +         if (this.isStarted.get()) {
    +            return;
    +         }
    +         if (this.connectorName == null || this.connectorName.trim().equals("")) {
    +            throw new Exception("invalid connector name: " + this.connectorName);
    +         }
    +
    +         if (this.topicName == null || this.topicName.trim().equals("")) {
    +            throw new Exception("invalid topic name: " + topicName);
    +         }
    +
    +         if (this.queueName == null || this.queueName.trim().equals("")) {
    +            throw new Exception("invalid queue name: " + queueName);
    +         }
    +
    +         this.filter = FilterImpl.createFilter(filterString);
    +
    +         SimpleString name = new SimpleString(this.queueName);
    +         Binding b = this.postOffice.getBinding(name);
    +         if (b == null) {
    +            throw new Exception(connectorName + ": queue " + queueName + " not found");
    +         }
    +         this.queue = (Queue) b.getBindable();
    +
    +         this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration));
    +
    +         List<PartitionInfo> topicPartitions = kafkaProducer.partitionsFor(topicName);
    +         if (topicPartitions == null || topicPartitions.size() == 0) {
    +            throw new Exception(connectorName + ": topic " + topicName + " not found");
    +         }
    +
    +         this.retryCount.set(0);
    +         this.isStarted.set(true);
    +         connect();
    +         ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName);
    +      }
    +   }
    +
    +   public void connect() throws Exception {
    +      synchronized (this) {
    +         if (!isConnected && isStarted.get()) {
    +            isConnected = true;
    +            this.queue.addConsumer(this);
    +            this.queue.deliverAsync();
    +            ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void disconnect() {
    +      synchronized (this) {
    +         if (isConnected) {
    +            if (queue != null) {
    +               this.queue.removeConsumer(this);
    +            }
    +
    +            cancelRefs();
    +            if (queue != null) {
    +               queue.deliverAsync();
    +            }
    +            isConnected = false;
    +            ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName);
    +         }
    +      }
    +   }
    +
    +   @Override
    +   public void stop() {
    +      synchronized (this) {
    +         if (!this.isStarted.get()) {
    +            return;
    +         }
    +         ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName);
    +
    +         disconnect();
    +
    +         kafkaProducer.close();
    +         kafkaProducer = null;
    +         this.isStarted.set(false);
    +         ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName);
    +      }
    +   }
    +
    +   @Override
    +   public boolean isStarted() {
    +      return this.isStarted.get();
    +   }
    +
    +   @Override
    +   public String getName() {
    +      return this.connectorName;
    +   }
    +
    +   @Override
    +   public boolean supportsDirectDelivery() {
    +      return false;
    +   }
    +
    +   @Override
    +   public HandleStatus handle(MessageReference ref) throws Exception {
    +      if (filter != null && !filter.match(ref.getMessage())) {
    +         return HandleStatus.NO_MATCH;
    +      }
    +
    +      synchronized (this) {
    +         ref.handled();
    +
    +         Message message = ref.getMessage();
    +
    +         synchronized (refs) {
    +            refs.put(ref.getMessage().getMessageID(), ref);
    +         }
    +
    +         Integer partition = null;
    +         SimpleString groupdID = message.getGroupID();
    +         if (groupdID != null) {
    +            List partitions = kafkaProducer.partitionsFor(topicName);
    +            int numPartitions = partitions.size();
    +            partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions;
    --- End diff --
    
    It's the same way as the DefaultPartitioner works in Kafka, why do you re-used the logic
here ?


---

Mime
View raw message