kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mauro Giacometti (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
Date Thu, 23 Nov 2017 16:47:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264561#comment-16264561
] 

Mauro Giacometti edited comment on KAFKA-6267 at 11/23/17 4:46 PM:
-------------------------------------------------------------------

Hi Ismal,
i'm facing the same issue on Linux. Let me describe you the whole scenario. Silvio is my colleague
and he's working with me in my project.

I'm trying to chain a JPATransaction with a KafkaTransaction by using a ChainedTransactionManager
provided by spring. This choise deals with the need to implement a One Phase Commit. We need
to open a "unique transaction" (managed by the ChainedTransactionManager) which open first
a transaction to a Kafka Broker, then a transaction to an Oracle DB, a write operation is
performed on the db. A final commit on the db is performed immediatly followed by a commit
of the Kafka Transaction. Every rollback in the JPA transaction will lead to a Kafka transaction
rollback.
Moreover, we're using spring-integration-kafka and Apache Avro for manipulating/serializing/deserializing
the messages we're publishing on the Kafka Broker.

What we have now is:
0. Client rest service invocation for the creation of a product.
1. Kafka transaction is opened -> Message is delivered.
2. Data are persisted on the database
3. Data are correctly committed on database
4. Kafka transaction is not committed but is on a IN_TRANSITION state and i don't know why
5. A second call to the rest service immedialy fails as a new kafka transaction is required
but there's the old one up. So the abortTransaction method is invoked.

What we need to have is:
0. Client rest service invocation for the creation of a product.
1. Kafka transaction is opened -> New Product request Message is delivered.
2. Data are persisted on the database
3. Data are correctly committed on database
4. Kafka transaction committed and available for the consumers.


Here is my configurations for the factory, template, etc. :

  public PlatformTransactionManager transactionManager(
      EntityManagerFactory emf,
      DefaultKafkaProducerFactory producerFactory) {
    
    final JpaTransactionManager jpaTransactionManager = new JpaTransactionManager();
    jpaTransactionManager.setEntityManagerFactory(emf);
    final KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory);
    kafkaTransactionManager.setFailEarlyOnGlobalRollbackOnly(true);
    kafkaTransactionManager.setNestedTransactionAllowed(true);
    kafkaTransactionManager.setValidateExistingTransaction(true);
    kafkaTransactionManager.setRollbackOnCommitFailure(true);
    kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
   return new ChainedTransactionManager(new PlatformTransactionManager[] { jpaTransactionManager,
kafkaTransactionManager });
    
  }

@Bean
  public DefaultKafkaProducerFactory producerFactory() {
    
    final Map<String, Object> configMap = new HashMap<>();
    configMap.put("bootstrap.servers", bootstrapServers);
    configMap.put("key.serializer", keySerializer);
    configMap.put("value.serializer", valueSerializer);
    configMap.put("schema.registry.url", schemaRegistryEndpoint);
    configMap.put("enable.idempotence", enableIdempotenceConfig );
    configMap.put("transactional.id", transactionalIdConfig);
    configMap.put("transaction.timeout.ms", transactionTimeoutMs);
    
    final DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configMap);
    
    producerFactory.setTransactionIdPrefix(transactionalIdConfig);
    return producerFactory;
  }

 public KafkaTemplate templateCreationCustomer(DefaultKafkaProducerFactory producerFactory,
RecordMessageConverter messageConverter) {
    
    KafkaTemplate template = new KafkaTemplate<>(producerFactory);
    template.setMessageConverter(messageConverter);
    return template;
  }

Versions used :
spring-integration-kafka is 2.3.0
kafka-client is 0.11.0.0
kavka-avro-serializer is 3.3.0

EDIT//
spring-kafka is 1.3.0


was (Author: mauro.giacometti):
Hi Ismal,
i'm facing the same issue on Linux. Let me describe you the whole scenario. Silvio is my colleague
and he's working with me in my project.

I'm trying to chain a JPATransaction with a KafkaTransaction by using a ChainedTransactionManager
provided by spring. This choise deals with the need to implement a One Phase Commit. We need
to open a "unique transaction" (managed by the ChainedTransactionManager) which open first
a transaction to a Kafka Broker, then a transaction to an Oracle DB, a write operation is
performed on the db. A final commit on the db is performed immediatly followed by a commit
of the Kafka Transaction. Every rollback in the JPA transaction will lead to a Kafka transaction
rollback.
Moreover, we're using spring-integration-kafka and Apache Avro for manipulating/serializing/deserializing
the messages we're publishing on the Kafka Broker.

What we have now is:
0. Client rest service invocation for the creation of a product.
1. Kafka transaction is opened -> Message is delivered.
2. Data are persisted on the database
3. Data are correctly committed on database
4. Kafka transaction is not committed but is on a IN_TRANSITION state and i don't know why
5. A second call to the rest service immedialy fails as a new kafka transaction is required
but there's the old one up. So the abortTransaction method is invoked.

What we need to have is:
0. Client rest service invocation for the creation of a product.
1. Kafka transaction is opened -> New Product request Message is delivered.
2. Data are persisted on the database
3. Data are correctly committed on database
4. Kafka transaction committed and available for the consumers.


Here is my configurations for the factory, template, etc. :

  public PlatformTransactionManager transactionManager(
      EntityManagerFactory emf,
      DefaultKafkaProducerFactory producerFactory) {
    
    final JpaTransactionManager jpaTransactionManager = new JpaTransactionManager();
    jpaTransactionManager.setEntityManagerFactory(emf);
    final KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory);
    kafkaTransactionManager.setFailEarlyOnGlobalRollbackOnly(true);
    kafkaTransactionManager.setNestedTransactionAllowed(true);
    kafkaTransactionManager.setValidateExistingTransaction(true);
    kafkaTransactionManager.setRollbackOnCommitFailure(true);
    kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
   return new ChainedTransactionManager(new PlatformTransactionManager[] { jpaTransactionManager,
kafkaTransactionManager });
    
  }

@Bean
  public DefaultKafkaProducerFactory producerFactory() {
    
    final Map<String, Object> configMap = new HashMap<>();
    configMap.put("bootstrap.servers", bootstrapServers);
    configMap.put("key.serializer", keySerializer);
    configMap.put("value.serializer", valueSerializer);
    configMap.put("schema.registry.url", schemaRegistryEndpoint);
    configMap.put("enable.idempotence", enableIdempotenceConfig );
    configMap.put("transactional.id", transactionalIdConfig);
    configMap.put("transaction.timeout.ms", transactionTimeoutMs);
    
    final DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configMap);
    
    producerFactory.setTransactionIdPrefix(transactionalIdConfig);
    return producerFactory;
  }

 public KafkaTemplate templateCreationCustomer(DefaultKafkaProducerFactory producerFactory,
RecordMessageConverter messageConverter) {
    
    KafkaTemplate template = new KafkaTemplate<>(producerFactory);
    template.setMessageConverter(messageConverter);
    return template;
  }

Versions used :
spring-integration-kafka is 2.3.0
kafka-client is 0.11.0.1
kavka-avro-serializer is 3.3.0

EDIT//
spring-kafka is 1.3.0

> Kafka Producer - initTransaction forever waiting
> ------------------------------------------------
>
>                 Key: KAFKA-6267
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6267
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, producer 
>    Affects Versions: 0.11.0.1, 0.11.0.2
>            Reporter: Silvio Papa
>         Attachments: controller.log, producer.JPG, server.log
>
>
> In code of attached image, the producer remains forever awaiting in initTransaction with
default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message