kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cemalettin Koç (JIRA) <j...@apache.org>
Subject [jira] [Comment Edited] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL
Date Mon, 26 Mar 2018 20:21:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414469#comment-16414469
] 

Cemalettin Koç edited comment on KAFKA-6713 at 3/26/18 8:20 PM:
----------------------------------------------------------------

Hi [~guozhang], I have copied TypeConverter internal interface and added some other methods
for my needs. What I want to use actually using a custom in memory store. Then I wanted to
pass this `category in memory store` to my `category service`. 

 

 
{code:java}
class CategoryInMemoryStore extends InMemoryKeyValueStore<Long, Category> { // implementation
}
{code}
 

I have created an instance of `CategoryInMemoryStore` and passed into my StreamsBuilder as
this:

 
{code:java}
public GlobalKTable<Long, Category> categoryKGlobalTable(StreamsBuilder streamsBuilder)
{
  KeyValueBytesStoreSupplier supplier =
      new DelegatingByteStore<>(categoryInMemoryStore, createConvertor()).asSupplier();
  return streamsBuilder.globalTable(categoryTopic,
                                    Materialized.<Long, Category>as(supplier)
                                        .withCachingDisabled()
                                        .withKeySerde(Serdes.Long())
                                        .withValueSerde(CATEGORY_JSON_SERDE));
}
{code}
The whole point of the files I have attached is that creating a customized version of in
memory key value store implementation. 

I have also attached my implementations which are used above. 

 

 


was (Author: cemo):
Hi [~guozhang], I have copied TypeConverter internal interface and added some other methods
for my needs. What I want to use actually using a custom in memory store. Then I wanted to
pass this `category in memory store` to my `category service`. 

 

 
{code:java}
class CategoryInMemoryStore extends InMemoryKeyValueStore<Long, Category> { // implementation
}
{code}
 

I have created an instance of `CategoryInMemoryStore` and passed into my StreamsBuilder as
this:

 
{code:java}
public GlobalKTable<Long, Category> categoryKGlobalTable(StreamsBuilder streamsBuilder)
{
  KeyValueBytesStoreSupplier supplier =
      new DelegatingByteStore<>(categoryInMemoryStore, createConvertor()).asSupplier();
  return streamsBuilder.globalTable(categoryTopic,
                                    Materialized.<Long, Category>as(supplier)
                                        .withCachingDisabled()
                                        .withKeySerde(Serdes.Long())
                                        .withValueSerde(CATEGORY_JSON_SERDE));
}
{code}
The whole point of the files I have attached to create a my in memory key value store implementation. 

I have also attached my implementations which are used above. 

 

 

> Provide an easy way replace store with a custom one on High-Level Streams DSL
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-6713
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6713
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.0.1
>            Reporter: Cemalettin Koç
>            Priority: Major
>              Labels: streaming-api
>         Attachments: BytesTypeConverter.java, DelegatingByteStore.java, TypeConverter.java
>
>
> I am trying to use GlobalKTable with a custom store implementation. In my stores, I would
like to store my `Category` entites and I would like to query them by their name as well.
My custom store has some capabilities beyond `get` such as get by `name`. I also want to get
all entries in a hierarchical way in a lazy fashion. I have other use cases as well.
>  
> In order to accomplish my task I had to implement  a custom `KeyValueBytesStoreSupplier`, 
`BytesTypeConverter` and 
>  
> {code:java}
> public class DelegatingByteStore<K, V> implements KeyValueStore<Bytes, byte[]>
{
>   private BytesTypeConverter<K, V> converter;
>   private KeyValueStore<K, V> delegated;
>   public DelegatingByteStore(KeyValueStore<K, V> delegated, BytesTypeConverter<K,
V> converter) {
>     this.converter = converter;
>     this.delegated = delegated;
>   }
>   @Override
>   public void put(Bytes key, byte[] value) {
>     delegated.put(converter.outerKey(key),
>                   converter.outerValue(value));
>   }
>   @Override
>   public byte[] putIfAbsent(Bytes key, byte[] value) {
>     V v = delegated.putIfAbsent(converter.outerKey(key),
>                                 converter.outerValue(value));
>     return v == null ? null : value;
>   }
>   ......
> {code}
>  
>  Type Converter:
> {code:java}
> public interface TypeConverter<K, IK, V, IV> {
>   IK innerKey(final K key);
>   IV innerValue(final V value);
>   List<KeyValue<IK, IV>> innerEntries(final List<KeyValue<K, V>>
from);
>   List<KeyValue<K, V>> outerEntries(final List<KeyValue<Bytes, byte[]>>
from);
>   V outerValue(final IV value);
>   KeyValue<K, V> outerKeyValue(final KeyValue<IK, IV> from);
>   KeyValue<Bytes, byte[]>innerKeyValue(final KeyValue<K, V> entry);
>   K outerKey(final IK ik);
> }
> {code}
>  
> This is unfortunately too cumbersome and hard to maintain.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message