kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cemalettin Koç (JIRA) <j...@apache.org>
Subject [jira] [Created] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL
Date Sun, 25 Mar 2018 10:40:00 GMT
Cemalettin Koç created KAFKA-6713:
-------------------------------------

             Summary: Provide an easy way replace store with a custom one on High-Level Streams
DSL
                 Key: KAFKA-6713
                 URL: https://issues.apache.org/jira/browse/KAFKA-6713
             Project: Kafka
          Issue Type: Improvement
          Components: streams
    Affects Versions: 1.0.1
            Reporter: Cemalettin Koç


I am trying to use GlobalKTable with a custom store implementation. In my stores, I would
like to store my `Category` entites and I would like to query them by their name as well.
My custom store has some capabilities beyond `get` such as get by `name`. I also want to get
all entries in a hierarchical way in a lazy fashion. I have other use cases as well.

 

In order to accomplish my task I had to implement  a custom `KeyValueBytesStoreSupplier`, 
`BytesTypeConverter` and 

 
{code:java}
public class DelegatingByteStore<K, V> implements KeyValueStore<Bytes, byte[]>
{

  private BytesTypeConverter<K, V> converter;

  private KeyValueStore<K, V> delegated;

  public DelegatingByteStore(KeyValueStore<K, V> delegated, BytesTypeConverter<K,
V> converter) {
    this.converter = converter;
    this.delegated = delegated;
  }

  @Override
  public void put(Bytes key, byte[] value) {
    delegated.put(converter.outerKey(key),
                  converter.outerValue(value));
  }

  @Override
  public byte[] putIfAbsent(Bytes key, byte[] value) {
    V v = delegated.putIfAbsent(converter.outerKey(key),
                                converter.outerValue(value));
    return v == null ? null : value;
  }
  ......

{code}
 

 Type Converter:
{code:java}
public interface TypeConverter<K, IK, V, IV> {

  IK innerKey(final K key);

  IV innerValue(final V value);

  List<KeyValue<IK, IV>> innerEntries(final List<KeyValue<K, V>> from);

  List<KeyValue<K, V>> outerEntries(final List<KeyValue<Bytes, byte[]>>
from);

  V outerValue(final IV value);

  KeyValue<K, V> outerKeyValue(final KeyValue<IK, IV> from);

  KeyValue<Bytes, byte[]>innerKeyValue(final KeyValue<K, V> entry);

  K outerKey(final IK ik);
}
{code}
 

This is unfortunately too cumbersome and hard to maintain.  

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message