apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raja.Aravapalli <Raja.Aravapa...@target.com>
Subject Re: [EXTERNAL] Re: KafkaSinglePortInputOperator
Date Tue, 06 Dec 2016 11:28:38 GMT

Thanks a log for the response Chaitanya!!

Sharing more details for your reference and suggestions !!

Appliation.java:

import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator;
import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.hadoop.conf.Configuration;

import java.util.Properties;


public class Application implements StreamingApplication {

    public void populateDAG(DAG dag, Configuration conf)
    {


        Properties props = new Properties();

        props.put("security.protocol", "SSL");
        props.put("ssl.truststore.location","client.truststore.jks");
        props.put("ssl.truststore.password","******");
        props.put("ssl.keystore.location","server.keystore.jks");
        props.put("ssl.keystore.password","******");
        props.put("schema.registry.url", "http://********:8081");
        props.put("enable.auto.commit", "false");

        KafkaSinglePortInputOperator in = dag.addOperator("in", new KafkaSinglePortInputOperator());
        in.setInitialPartitionCount(1);
        in.setTopics("***********");
        in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
        in.setClusters("********:9093");

        LineOutputOperator out = dag.addOperator("out", new LineOutputOperator());
        out.setFilePath("hdfs://******/*********");
        out.setFileName("test");
        out.setMaxLength(1024);

        dag.addStream("data", in.outputPort, out.input);

    }


}


Also, sharing POM.xml below:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.tgt.hdp.mighty</groupId>
    <artifactId>test-apex</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <apex.version>3.4.0</apex.version>
        <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
        <hadoop.version>2.7.1.2.3.4.0-3485</hadoop.version>
        <hbase.version>1.1.2.2.3.4.0-3485</hbase.version>
        <kafka.version>0.9.0.1</kafka.version>
        <confluent.kafka.version>0.9.0.1-cp1</confluent.kafka.version>
        <kafka.avro.srlzr.version>2.0.1</kafka.avro.srlzr.version>
        <avro.version>1.7.7</avro.version>
        <json.version>1.1</json.version>
        <jodatime.version>2.9.1</jodatime.version>
        <kyroserializer.version>0.38</kyroserializer.version>
        <junit.version>4.10</junit.version>
    </properties>

    <repositories>
        <repository>
            <id>HDPReleases</id>
            <name>HDP Releases</name>
            <url>http://repo.hortonworks.com/content/repositories/releases/</url>
            <layout>default</layout>
        </repository>
        <repository>
            <id>HDP Jetty Hadoop</id>
            <name>HDP Jetty Hadoop</name>
            <url>http://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
            <layout>default</layout>
        </repository>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>malhar-library</artifactId>
            <version>${apex.version}</version>
            <!--
                 If you know that your application does not need transitive dependencies pulled
in by malhar-library,
                 uncomment the following to reduce the size of your app package.
            -->
            <!--
            <exclusions>
              <exclusion>
                <groupId>*</groupId>
                <artifactId>*</artifactId>
              </exclusion>
            </exclusions>
            -->
        </dependency>
        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>apex-common</artifactId>
            <version>${apex.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>apex-engine</artifactId>
            <version>${apex.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>malhar-contrib</artifactId>
            <version>${apex.version}</version>
        </dependency>

        <dependency>
           <groupId>org.apache.apex</groupId>
            <artifactId>malhar-kafka</artifactId>
            <version>${apex.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${confluent.kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${kafka.avro.srlzr.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>${json.version}</version>
        </dependency>

        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>${jodatime.version}</version>
        </dependency>

        <dependency>
            <groupId>de.javakaffee</groupId>
            <artifactId>kryo-serializers</artifactId>
            <version>${kyroserializer.version}</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-eclipse-plugin</artifactId>
                <version>2.9</version>
                <configuration>
                    <downloadSources>true</downloadSources>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                    <source>1.7</source>
                    <target>1.7</target>
                    <debug>true</debug>
                    <optimize>false</optimize>
                    <showDeprecation>true</showDeprecation>
                    <showWarnings>true</showWarnings>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.8</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>target/deps</outputDirectory>
                            <includeScope>runtime</includeScope>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <executions>
                    <execution>
                        <id>app-package-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                        <configuration>
                            <finalName>${project.artifactId}-${project.version}-apexapp</finalName>
                            <appendAssemblyId>false</appendAssemblyId>
                            <descriptors>
                                <descriptor>src/assemble/appPackage.xml</descriptor>
                            </descriptors>
                            <archiverConfig>
                                <defaultDirectoryMode>0755</defaultDirectoryMode>
                            </archiverConfig>
                            <archive>
                                <manifestEntries>
                                    <Class-Path>${apex.apppackage.classpath}</Class-Path>
                                    <DT-Engine-Version>${apex.version}</DT-Engine-Version>
                                    <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id>
                                    <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
                                    <DT-App-Package-Version>${project.version}</DT-App-Package-Version>
                                    <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
                                    <DT-App-Package-Description>${project.description}</DT-App-Package-Description>
                                </manifestEntries>
                            </archive>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-antrun-plugin</artifactId>
                <version>1.7</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <configuration>
                            <target>
                                <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar"
                                      tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa"
/>
                            </target>
                        </configuration>
                        <goals>
                            <goal>run</goal>
                        </goals>
                    </execution>
                    <execution>
                        <!-- create resource directory for xml javadoc-->
                        <id>createJavadocDirectory</id>
                        <phase>generate-resources</phase>
                        <configuration>
                            <tasks>
                                <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/>
                                <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/>
                            </tasks>
                        </configuration>
                        <goals>
                            <goal>run</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>1.9.1</version>
                <executions>
                    <execution>
                        <id>attach-artifacts</id>
                        <phase>package</phase>
                        <goals>
                            <goal>attach-artifact</goal>
                        </goals>
                        <configuration>
                            <artifacts>
                                <artifact>
                                    <file>target/${project.artifactId}-${project.version}.apa</file>
                                    <type>apa</type>
                                </artifact>
                            </artifacts>
                            <skipAttach>false</skipAttach>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>

    </build>

</project>



Regards,
Raja.

From: Chaitanya Chebolu <chaitanya@datatorrent.com>
Reply-To: "users@apex.apache.org" <users@apex.apache.org>
Date: Tuesday, December 6, 2016 at 4:28 PM
To: "users@apex.apache.org" <users@apex.apache.org>
Subject: [EXTERNAL] Re: KafkaSinglePortInputOperator

Hi Raja,

  Could you please share the Application Master logs and Kafka operator container logs.

Regards,
Chaitanya

On Tue, Dec 6, 2016 at 4:17 PM, Raja.Aravapalli <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
wrote:

Hi Team,

I am using “KafkaSinglePortInputOperator” to connect to a SSL Secured topic in Kafka 0.9!!

Unfortunately… my apex application is not going to “RUNNING” state…!! Only staying
in ACCEPTED State and then going into FAILED statie!! I don’t see much information in the
logs…!! ☹

Can someone please help fix the issue…. We have immediate need to read messages from kafka
0.9 SSL configured topics…

Please advise!


Thanks very much in advance.


Regards,
Raja.

Mime
View raw message