No body have implemented Trident with Kafka yet ?
On Sat, Apr 5, 2014 at 3:44 PM, shamsul haque <shams.hq@gmail.com> wrote:
>
> Hi team,
>
> I am making an application to get data from kafka queue and after some
> processing push it again in kafka queue in different topic. I got a help
> from this URL<https://github.com/quintona/trident-kafka-push/blob/master/src/test/java/com/github/quintona/TestTopology.java>.
But data is not going in kafka queue. Below is my Topology and KafkaState
> (implements State) class. Please tell me where i am doing wrong.
>
> public class TestTridentTopology {
>
> public static Logger logger = Logger.getLogger(TestTridentTopology.class);
>
> public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException
{
> String topologyName = null;
> String[] kafkaTopic = null;
> int length = args.length;
> if (args != null && length > 0) {
> topologyName = args[0];
> kafkaTopic = new String[length - 1];
> for (int i = 1; i < length; i++) {
> kafkaTopic[i - 1] = args[i];
> }
> }
> Config conf = new Config();
> conf.setDebug(false);
> conf.setNumWorkers(5);
> conf.setMaxSpoutPending(5);
> conf.setMaxTaskParallelism(3);
> if (topologyName != null) {
> StormSubmitter.submitTopology(topologyName, conf, buildTopology(kafkaTopic));
> } else {
> LocalCluster cluster = new LocalCluster();
> cluster.submitTopology("test", conf, buildTopology(kafkaTopic));
> Utils.sleep(10000);
> }
> }
>
> /**
> *
> * @param service
> * @return
> */
> public static StormTopology buildTopology(String[] kafkaTopic) {
>
> try {
> BrokerHosts brokerHost = new ZkHosts("localhost:2181", "/brokers");
>
> TridentKafkaConfig config = new TridentKafkaConfig(brokerHost, "testtopic1");
> config.forceStartOffsetTime(-2);
> config.scheme = new SchemeAsMultiScheme(new StringScheme());
>
> TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(config);
>
> TridentTopology topology = new TridentTopology();
> TridentState parallelismHint = topology.newStream("feed", spout)
> .shuffle()
> .each(new Fields("str"), new TridentFetcherBolt(), new Fields("textJSON"))
> .partitionPersist(KafkaState.transactional("testtopic2", new KafkaState.Options()),
new KafkaStateUpdater("textJSON"))
> .parallelismHint(1);
>
> logger.warn("parallelismHint" + parallelismHint);
>
> return topology.build();
> } catch (Exception e) {
> logger.error("Exception: ", e);
> }
> return null;
>
> }
> }
>
>
> public class KafkaState implements State {
>
> private static final Logger logger = Logger.getLogger(KafkaState.class);
>
> ConcurrentLinkedQueue<String> messages = new ConcurrentLinkedQueue<String>();
>
> public static class Options implements Serializable {
>
> public String zookeeperHost = "127.0.0.1";
> public int zookeeperPort = 2181;
> public String serializerClass = "kafka.serializer.StringEncoder";
> public String kafkaConnect = "127.0.0.1:9092";
>
> public Options() {
> logger.debug("KafkaState::Options()");
> }
>
> public Options(String zookeeperHost, int zookeeperPort, String serializerClass, String
topicName) {
> this.zookeeperHost = zookeeperHost;
> this.zookeeperPort = zookeeperPort;
> this.serializerClass = serializerClass;
> }
> }
>
> public static StateFactory transactional(String topic, Options options) {
> logger.debug("KafkaState::transactional: " + topic);
> return new Factory(topic, options, true);
> }
>
> public static StateFactory nonTransactional(String topic, Options options) {
> return new Factory(topic, options, false);
> }
>
> protected static class Factory implements StateFactory {
>
> private Options options;
> private String topic;
> boolean transactional;
>
> public Factory(String topic, Options options, boolean transactional) {
> this.options = options;
> this.topic = topic;
> this.transactional = transactional;
> }
>
> @Override
> public State makeState(Map conf, IMetricsContext metrics,
> int partitionIndex, int numPartitions) {
> return new KafkaState(topic, options, transactional);
> }
>
> }
>
> private Options options;
> private String topic;
> Producer<String, String> producer;
> private boolean transactional;
>
> public KafkaState(String topic, Options options, boolean transactional) {
> this.topic = topic;
> this.options = options;
> this.transactional = transactional;
> Properties props = new Properties();
> props.put("zk.connect", options.zookeeperHost + ":" + Integer.toString(options.zookeeperPort));
> props.put("serializer.class", options.serializerClass);
> props.put("metadata.broker.list", options.kafkaConnect);
>
> ProducerConfig config = new ProducerConfig(props);
> producer = new Producer<>(config);
> logger.debug("producer initialized successfully." + producer);
> }
>
> @Override
> public void beginCommit(Long txid) {
> logger.debug("KafkaState::beginCommit");
> if (messages.size() > 0) {
> throw new RuntimeException("Kafka State is invalid, the previous transaction
didn't flush");
> }
> }
>
> public void enqueue(String message) {
> logger.debug("KafkaState::enqueue ^^^^^^^^^ " + message);
> if (transactional) {
> messages.add(message);
> } else {
> sendMessage(message);
> }
> }
>
> private void sendMessage(String message) {
> KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic,
message);
> producer.send(data);
> }
>
> @Override
> public void commit(Long txid) {
> String message = messages.poll();
> logger.debug("KafkaState::commit @@@@@@@@@@@@@@ " + message);
>
> while (message != null) {
> sendMessage(message);
> message = messages.poll();
> }
> }
> }
>
> Thanks
>
>
>
>
|