Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 36B96200D24 for ; Tue, 24 Oct 2017 16:37:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 35550160BE0; Tue, 24 Oct 2017 14:37:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 540CC160BDB for ; Tue, 24 Oct 2017 16:37:36 +0200 (CEST) Received: (qmail 12183 invoked by uid 500); 24 Oct 2017 14:37:35 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 12170 invoked by uid 99); 24 Oct 2017 14:37:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Oct 2017 14:37:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 13368DFB6F; Tue, 24 Oct 2017 14:37:35 +0000 (UTC) From: ppatierno To: dev@activemq.apache.org Reply-To: dev@activemq.apache.org References: In-Reply-To: Subject: [GitHub] activemq-artemis pull request #1607: ARTEMIS-1478 - ActiveMQ Artemis to Kafk... Content-Type: text/plain Message-Id: <20171024143735.13368DFB6F@git1-us-west.apache.org> Date: Tue, 24 Oct 2017 14:37:35 +0000 (UTC) archived-at: Tue, 24 Oct 2017 14:37:37 -0000 Github user ppatierno commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/1607#discussion_r146578570 --- Diff: integration/activemq-kafka/activemq-kafka-bridge/src/main/java/org/apache/activemq/artemis/integration/kafka/bridge/KafkaProducerBridge.java --- @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.integration.kafka.bridge; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.filter.impl.FilterImpl; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.ConnectorService; +import org.apache.activemq.artemis.core.server.Consumer; +import org.apache.activemq.artemis.core.server.HandleStatus; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.integration.kafka.protocol.core.CoreMessageSerializer; +import org.apache.activemq.artemis.utils.ConfigurationHelper; +import org.apache.activemq.artemis.utils.ReusableLatch; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Utils; + +class KafkaProducerBridge implements Consumer, ConnectorService { + + private final String connectorName; + + private final String queueName; + + private final String topicName; + + private final PostOffice postOffice; + + private Queue queue = null; + + private Filter filter = null; + + private String filterString; + + private AtomicBoolean isStarted = new AtomicBoolean(); + + private boolean isConnected = false; + + private Producer kafkaProducer; + + private Map configuration; + + private long sequentialID; + + private final ReusableLatch pendingAcks = new ReusableLatch(0); + + private final java.util.Map refs = new LinkedHashMap<>(); + + private final ScheduledExecutorService scheduledExecutorService; + + private final int retryAttempts; + + private final long retryInterval; + + private final double retryMultiplier; + + private final long retryMaxInterval; + + private final AtomicInteger retryCount = new AtomicInteger(); + + private final AtomicBoolean awaitingReconnect = new AtomicBoolean(); + + private final KafkaProducerFactory kafkaProducerFactory; + + + KafkaProducerBridge(String connectorName, KafkaProducerFactory kafkaProducerFactory, Map configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) { + this.sequentialID = storageManager.generateID(); + + this.kafkaProducerFactory = kafkaProducerFactory; + + this.connectorName = connectorName; + this.queueName = ConfigurationHelper.getStringProperty(KafkaConstants.QUEUE_NAME, null, configuration); + this.topicName = ConfigurationHelper.getStringProperty(KafkaConstants.TOPIC_NAME, null, configuration); + + this.retryAttempts = ConfigurationHelper.getIntProperty(KafkaConstants.RETRY_ATTEMPTS_NAME, -1, configuration); + this.retryInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_INTERVAL_NAME, 2000, configuration); + this.retryMultiplier = ConfigurationHelper.getDoubleProperty(KafkaConstants.RETRY_MULTIPLIER_NAME, 2, configuration); + this.retryMaxInterval = ConfigurationHelper.getLongProperty(KafkaConstants.RETRY_MAX_INTERVAL_NAME, 30000, configuration); + + this.filterString = ConfigurationHelper.getStringProperty(KafkaConstants.FILTER_STRING, null, configuration); + + configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + if (!configuration.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { + configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoreMessageSerializer.class.getName()); + } + + this.postOffice = postOffice; + this.configuration = configuration; + this.scheduledExecutorService = scheduledExecutorService; + } + + private Map kafkaConfig(Map configuration) { + Map filteredConfig = new HashMap<>(configuration); + KafkaConstants.OPTIONAL_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove); + KafkaConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS.forEach(filteredConfig::remove); + return filteredConfig; + } + + @Override + public void start() throws Exception { + synchronized (this) { + if (this.isStarted.get()) { + return; + } + if (this.connectorName == null || this.connectorName.trim().equals("")) { + throw new Exception("invalid connector name: " + this.connectorName); + } + + if (this.topicName == null || this.topicName.trim().equals("")) { + throw new Exception("invalid topic name: " + topicName); + } + + if (this.queueName == null || this.queueName.trim().equals("")) { + throw new Exception("invalid queue name: " + queueName); + } + + this.filter = FilterImpl.createFilter(filterString); + + SimpleString name = new SimpleString(this.queueName); + Binding b = this.postOffice.getBinding(name); + if (b == null) { + throw new Exception(connectorName + ": queue " + queueName + " not found"); + } + this.queue = (Queue) b.getBindable(); + + this.kafkaProducer = kafkaProducerFactory.create(kafkaConfig(configuration)); + + List topicPartitions = kafkaProducer.partitionsFor(topicName); + if (topicPartitions == null || topicPartitions.size() == 0) { + throw new Exception(connectorName + ": topic " + topicName + " not found"); + } + + this.retryCount.set(0); + this.isStarted.set(true); + connect(); + ActiveMQKafkaLogger.LOGGER.bridgeStarted(connectorName); + } + } + + public void connect() throws Exception { + synchronized (this) { + if (!isConnected && isStarted.get()) { + isConnected = true; + this.queue.addConsumer(this); + this.queue.deliverAsync(); + ActiveMQKafkaLogger.LOGGER.bridgeConnected(connectorName); + } + } + } + + @Override + public void disconnect() { + synchronized (this) { + if (isConnected) { + if (queue != null) { + this.queue.removeConsumer(this); + } + + cancelRefs(); + if (queue != null) { + queue.deliverAsync(); + } + isConnected = false; + ActiveMQKafkaLogger.LOGGER.bridgeDisconnected(connectorName); + } + } + } + + @Override + public void stop() { + synchronized (this) { + if (!this.isStarted.get()) { + return; + } + ActiveMQKafkaLogger.LOGGER.bridgeReceivedStopRequest(connectorName); + + disconnect(); + + kafkaProducer.close(); + kafkaProducer = null; + this.isStarted.set(false); + ActiveMQKafkaLogger.LOGGER.bridgeStopped(connectorName); + } + } + + @Override + public boolean isStarted() { + return this.isStarted.get(); + } + + @Override + public String getName() { + return this.connectorName; + } + + @Override + public boolean supportsDirectDelivery() { + return false; + } + + @Override + public HandleStatus handle(MessageReference ref) throws Exception { + if (filter != null && !filter.match(ref.getMessage())) { + return HandleStatus.NO_MATCH; + } + + synchronized (this) { + ref.handled(); + + Message message = ref.getMessage(); + + synchronized (refs) { + refs.put(ref.getMessage().getMessageID(), ref); + } + + Integer partition = null; + SimpleString groupdID = message.getGroupID(); + if (groupdID != null) { + List partitions = kafkaProducer.partitionsFor(topicName); + int numPartitions = partitions.size(); + partition = Utils.toPositive(Utils.murmur2(groupdID.getData())) % numPartitions; --- End diff -- It's the same way as the DefaultPartitioner works in Kafka, why do you re-used the logic here ? ---