Apache Kafka Integration With Storm
1. Introduction
This is an in-depth article related to the Apache Kafka producer example. Apache Kafka is an Apache open-source project. It was initially created on Linkedin. Kafka framework was created in java and scala. It supports publish-subscribe messaging and is fault-tolerant. It is scalable and performs for high-volume messaging. Zookeeper is the basic component that manages the Apache Kafka Server. Kafka has features related to reliability, scalability, performance, distributed logging, and durability.
2. Apache Kafka – Storm Integration
Apache Storm was developed by the BackType team led by Nathan Marz. The storm is used for real-time data processing. It can perform at speeds of 1 million tuples per second on a single node. Spouts are the data sources and bolts ae the data processing pipelines. The topology consists of Bolts and Spouts. Data is streamed using the storm topologies using Kafka for publishing and subscribe messaging.
2.1 Prerequisites
Java 7 or 8 is required on the Linux, windows, or Mac operating system. Apache Storm 0.95 and Kafka 2.11-0.9.0.0 are needed for this article.
2.2 Download
You can download Java 8 can be downloaded from the Oracle website. Kafka framework’s latest releases are available from this website. Apache storm can be downloaded from this site.
2.3 Setup
You can set the environment variables for JAVA_HOME and PATH. They can be set as shown below:
Setup
JAVA_HOME="/desktop/jdk1.8.0_73" export JAVA_HOME PATH=$JAVA_HOME/bin:$PATH export PATH
2.4 How to download and install Apache Kafka & Storm
Apache Kafka’s latest releases are available from the Apache Kafka website. After downloading the zip file can be extracted to a folder. Storm zip file can be extracted into a folder after downloading from the site.
To start the zookeeper, you can use the command below:
Zoo keeper
bin/zookeeper-server-start.sh config/zookeeper.properties
The output of the above command is shown as below:
Zoo keeper Output
apples-MacBook-Air:kafka_2.11-0.9.0.0 bhagvan.kommadi$ bin/zookeeper-server-start.sh config/zookeeper.properties [2021-05-27 00:39:27,510] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2021-05-27 00:39:27,514] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager) [2021-05-27 00:39:27,514] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager) [2021-05-27 00:39:27,514] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager) [2021-05-27 00:39:27,514] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain) [2021-05-27 00:39:27,559] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2021-05-27 00:39:27,561] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain) [2021-05-27 00:39:27,607] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:java.version=1.8.0_101 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:java.class.path=:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-http-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-log4j-appender-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-client-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-core-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zkclient-0.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-databind-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-media-jaxb-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-api-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-tools-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-guava-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/argparse4j-0.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-scaladoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-security-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-xml_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-log4j12-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-parser-combinators_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-javadoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-servlet-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-api-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.servlet-api-3.1.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-json-provider-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/log4j-1.2.17.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-module-jaxb-annotations-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-api-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/validation-api-1.1.0.Final.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-sources.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javassist-3.18.1-GA.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-test.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.annotation-api-1.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-library-2.11.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-json-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/metrics-core-2.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-clients-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/lz4-1.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-locator-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-io-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/aopalliance-repackaged-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jopt-simple-3.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/snappy-java-1.1.1.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-annotations-2.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zookeeper-3.4.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-util-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-server-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-file-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-runtime-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-utils-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-core-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-server-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-common-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-base-2.5.4.jar (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:java.library.path=/Users/bhagvan.kommadi/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:java.io.tmpdir=/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/ (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:java.compiler= (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:os.name=Mac OS X (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:os.arch=x86_64 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:os.version=10.16 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:user.name=bhagvan.kommadi (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:user.home=/Users/bhagvan.kommadi (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,610] INFO Server environment:user.dir=/Users/bhagvan.kommadi/Desktop/kafka_2.11-0.9.0.0 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,631] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,631] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,632] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,676] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)apples-MacBook-Air:kafka_2.11-0.9.0.0 bhagvan.kommadi$ bin/zookeeper-server-start.sh config/zookeeper.properties [2021-05-27 00:39:27,510] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2021-05-27 00:39:27,514] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager) [2021-05-27 00:39:27,514] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager) [2021-05-27 00:39:27,514] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager) [2021-05-27 00:39:27,514] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain) [2021-05-27 00:39:27,559] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2021-05-27 00:39:27,561] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain) [2021-05-27 00:39:27,607] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:host.name=localhost (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:java.version=1.8.0_101 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,608] INFO Server environment:java.class.path=:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-http-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-log4j-appender-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-client-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-core-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zkclient-0.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-databind-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-media-jaxb-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-api-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-tools-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-guava-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/argparse4j-0.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-scaladoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-security-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-xml_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-log4j12-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-parser-combinators_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-javadoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-servlet-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-api-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.servlet-api-3.1.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-json-provider-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/log4j-1.2.17.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-module-jaxb-annotations-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-api-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/validation-api-1.1.0.Final.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-sources.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javassist-3.18.1-GA.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-test.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.annotation-api-1.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-library-2.11.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-json-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/metrics-core-2.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-clients-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/lz4-1.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-locator-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-io-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/aopalliance-repackaged-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jopt-simple-3.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/snappy-java-1.1.1.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-annotations-2.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zookeeper-3.4.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-util-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-server-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-file-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-runtime-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-utils-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-core-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-server-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-common-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-base-2.5.4.jar (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:java.library.path=/Users/bhagvan.kommadi/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:java.io.tmpdir=/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/ (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:java.compiler= (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:os.name=Mac OS X (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:os.arch=x86_64 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:os.version=10.16 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:user.name=bhagvan.kommadi (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,609] INFO Server environment:user.home=/Users/bhagvan.kommadi (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,610] INFO Server environment:user.dir=/Users/bhagvan.kommadi/Desktop/kafka_2.11-0.9.0.0 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,631] INFO tickTime set to 3000 (orgapache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,631] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,632] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2021-05-27 00:39:27,676] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
You can now start the apache kafka server using the command below
Apache Kafka Server Run Command
bin/kafka-server-start.sh config/server.properties
The output of the above command is shown as below:
Apache Kafka Server Output
apples-MacBook-Air:kafka_2.11-0.9.0.0 bhagvan.kommadi$ bin/kafka-server-start.sh config/server.properties [2021-05-27 00:42:40,482] INFO KafkaConfig values: advertised.host.name = null metric.reporters = [] quota.producer.default = 9223372036854775807 offsets.topic.num.partitions = 50 log.flush.interval.messages = 9223372036854775807 auto.create.topics.enable = true controller.socket.timeout.ms = 30000 log.flush.interval.ms = null principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder replica.socket.receive.buffer.bytes = 65536 min.insync.replicas = 1 replica.fetch.wait.max.ms = 500 num.recovery.threads.per.data.dir = 1 ssl.keystore.type = JKS default.replication.factor = 1 ssl.truststore.password = null log.preallocate = false sasl.kerberos.principal.to.local.rules = [DEFAULT] fetch.purgatory.purge.interval.requests = 1000 ssl.endpoint.identification.algorithm = null replica.socket.timeout.ms = 30000 message.max.bytes = 1000012 num.io.threads = 8 offsets.commit.required.acks = -1 log.flush.offset.checkpoint.interval.ms = 60000 delete.topic.enable = false quota.window.size.seconds = 1 ssl.truststore.type = JKS offsets.commit.timeout.ms = 5000 quota.window.num = 11 zookeeper.connect = localhost:2181 authorizer.class.name = num.replica.fetchers = 1 log.retention.ms = null log.roll.jitter.hours = 0 log.cleaner.enable = false offsets.load.buffer.size = 5242880 log.cleaner.delete.retention.ms = 86400000 ssl.client.auth = none controlled.shutdown.max.retries = 3 queued.max.requests = 500 offsets.topic.replication.factor = 3 log.cleaner.threads = 1 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 socket.request.max.bytes = 104857600 ssl.trustmanager.algorithm = PKIX zookeeper.session.timeout.ms = 6000 log.retention.bytes = -1 sasl.kerberos.min.time.before.relogin = 60000 zookeeper.set.acl = false connections.max.idle.ms = 600000 offsets.retention.minutes = 1440 replica.fetch.backoff.ms = 1000 inter.broker.protocol.version = 0.9.0.X log.retention.hours = 168 num.partitions = 1 listeners = PLAINTEXT://0.0.0.0:9092 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] log.roll.ms = null log.flush.scheduler.interval.ms = 9223372036854775807 ssl.cipher.suites = null log.index.size.max.bytes = 10485760 ssl.keymanager.algorithm = SunX509 security.inter.broker.protocol = PLAINTEXT replica.fetch.max.bytes = 1048576 advertised.port = null log.cleaner.dedupe.buffer.size = 524288000 replica.high.watermark.checkpoint.interval.ms = 5000 log.cleaner.io.buffer.size = 524288 sasl.kerberos.ticket.renew.window.factor = 0.8 zookeeper.connection.timeout.ms = 6000 controlled.shutdown.retry.backoff.ms = 5000 log.roll.hours = 168 log.cleanup.policy = delete host.name = log.roll.jitter.ms = null max.connections.per.ip = 2147483647 offsets.topic.segment.bytes = 104857600 background.threads = 10 quota.consumer.default = 9223372036854775807 request.timeout.ms = 30000 log.index.interval.bytes = 4096 log.dir = /tmp/kafka-logs log.segment.bytes = 1073741824 log.cleaner.backoff.ms = 15000 offset.metadata.max.bytes = 4096 ssl.truststore.location = null group.max.session.timeout.ms = 30000 ssl.keystore.password = null zookeeper.sync.time.ms = 2000 port = 9092 log.retention.minutes = null log.segment.delete.delay.ms = 60000 log.dirs = /tmp/kafka-logs controlled.shutdown.enable = true compression.type = producer max.connections.per.ip.overrides = sasl.kerberos.kinit.cmd = /usr/bin/kinit log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 auto.leader.rebalance.enable = true leader.imbalance.check.interval.seconds = 300 log.cleaner.min.cleanable.ratio = 0.5 replica.lag.time.max.ms = 10000 num.network.threads = 3 ssl.key.password = null reserved.broker.max.id = 1000 metrics.num.samples = 2 socket.send.buffer.bytes = 102400 ssl.protocol = TLS socket.receive.buffer.bytes = 102400 ssl.keystore.location = null replica.fetch.min.bytes = 1 unclean.leader.election.enable = true group.min.session.timeout.ms = 6000 log.cleaner.io.buffer.load.factor = 0.9 offsets.retention.check.interval.ms = 600000 producer.purgatory.purge.interval.requests = 1000 metrics.sample.window.ms = 30000 broker.id = 0 offsets.topic.compression.codec = 0 log.retention.check.interval.ms = 300000 advertised.listeners = null leader.imbalance.per.broker.percentage = 10 (kafka.server.KafkaConfig) [2021-05-27 00:42:40,633] INFO starting (kafka.server.KafkaServer) [2021-05-27 00:42:40,641] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer) [2021-05-27 00:42:40,660] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2021-05-27 00:42:40,672] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,672] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,675] INFO Client environment:java.version=1.8.0_101 (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,675] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,675] INFO Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,675] INFO Client environment:java.class.path=:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-http-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-log4j-appender-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-client-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-core-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zkclient-0.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-databind-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-media-jaxb-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-api-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-tools-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-guava-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/argparse4j-0.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-scaladoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-security-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-xml_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-log4j12-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-parser-combinators_2.11-1.0.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-javadoc.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-servlet-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-api-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.servlet-api-3.1.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-json-provider-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/log4j-1.2.17.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-module-jaxb-annotations-2.5.4.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/slf4j-api-1.7.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/validation-api-1.1.0.Final.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-sources.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javassist-3.18.1-GA.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0-test.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.annotation-api-1.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/scala-library-2.11.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-json-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/metrics-core-2.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka-clients-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/lz4-1.2.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-locator-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-io-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/osgi-resource-locator-1.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/aopalliance-repackaged-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jopt-simple-3.2.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/snappy-java-1.1.1.7.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-annotations-2.5.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/kafka_2.11-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/zookeeper-3.4.6.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-util-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jetty-server-9.2.12.v20150709.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-file-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/connect-runtime-0.9.0.0.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/hk2-utils-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-container-servlet-core-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-2.4.0-b31.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-server-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/javax.inject-1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jersey-common-2.22.1.jar:/Users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/bin/../libs/jackson-jaxrs-base-2.5.4.jar (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:java.library.path=/Users/bhagvan.kommadi/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:java.io.tmpdir=/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/ (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:java.compiler= (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:os.name=Mac OS X (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:os.arch=x86_64 (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:os.version=10.16 (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:user.name=bhagvan.kommadi (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:user.home=/Users/bhagvan.kommadi (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,676] INFO Client environment:user.dir=/Users/bhagvan.kommadi/Desktop/kafka_2.11-0.9.0.0 (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,677] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@23f7d05d (org.apache.zookeeper.ZooKeeper) [2021-05-27 00:42:40,698] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient) [2021-05-27 00:42:40,705] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2021-05-27 00:42:40,827] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2021-05-27 00:42:40,928] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x179aa14e9770000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2021-05-27 00:42:40,929] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2021-05-27 00:42:41,229] INFO Loading logs. (kafka.log.LogManager) [2021-05-27 00:42:41,301] INFO Completed load of log my-topic-11 with log end offset 2 (kafka.log.Log) [2021-05-27 00:42:41,312] INFO Completed load of log kafkatopic-2 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,316] INFO Completed load of log kafkatopic-5 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,321] INFO Completed load of log my-topic-10 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,329] INFO Completed load of log kafka_topic_sentences-0 with log end offset 30 (kafka.log.Log) [2021-05-27 00:42:41,333] INFO Completed load of log kafkatopic-4 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,339] INFO Completed load of log kafkatopic-3 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,345] INFO Completed load of log my-topic-3 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,354] INFO Completed load of log my-topic-4 with log end offset 1 (kafka.log.Log) [2021-05-27 00:42:41,358] INFO Completed load of log kafkatopic-11 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,363] INFO Completed load of log my-topic-5 with log end offset 1 (kafka.log.Log) [2021-05-27 00:42:41,367] INFO Completed load of log my-topic-2 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,375] INFO Completed load of log my-first-topic-0 with log end offset 1006174 (kafka.log.Log) [2021-05-27 00:42:41,380] INFO Completed load of log kafkatopic-10 with log end offset 1 (kafka.log.Log) [2021-05-27 00:42:41,391] INFO Completed load of log kafka_topic_phrases-0 with log end offset 20 (kafka.log.Log) [2021-05-27 00:42:41,396] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,404] INFO Completed load of log newtopic-0 with log end offset 14 (kafka.log.Log) [2021-05-27 00:42:41,408] INFO Completed load of log kafkatopic-6 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,416] INFO Completed load of log kafkatopic-1 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,420] INFO Completed load of log kafkatopic-8 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,424] INFO Completed load of log my-topic-12 with log end offset 2 (kafka.log.Log) [2021-05-27 00:42:41,428] INFO Completed load of log kafkatopic-9 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,432] INFO Completed load of log kafkatopic-0 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,438] INFO Completed load of log kafkatopic-7 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,441] INFO Completed load of log kafkatopic-12 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,445] INFO Completed load of log my-topic-7 with log end offset 1 (kafka.log.Log) [2021-05-27 00:42:41,449] INFO Completed load of log my-topic-0 with log end offset 1 (kafka.log.Log) [2021-05-27 00:42:41,456] INFO Completed load of log my-topic-9 with log end offset 1 (kafka.log.Log) [2021-05-27 00:42:41,463] INFO Completed load of log my-topic-8 with log end offset 2 (kafka.log.Log) [2021-05-27 00:42:41,468] INFO Completed load of log my-topic-1 with log end offset 1 (kafka.log.Log) [2021-05-27 00:42:41,473] INFO Completed load of log my-topic-6 with log end offset 0 (kafka.log.Log) [2021-05-27 00:42:41,477] INFO Completed load of log ExampleTopic-0 with log end offset 10 (kafka.log.Log) [2021-05-27 00:42:41,482] INFO Logs loading complete. (kafka.log.LogManager) [2021-05-27 00:42:41,483] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2021-05-27 00:42:41,512] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2021-05-27 00:42:48,658] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor) [2021-05-27 00:42:48,663] INFO [Socket Server on Broker 0], Started 1 acceptor threads (kafka.network.SocketServer) [2021-05-27 00:42:48,705] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2021-05-27 00:42:48,705] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2021-05-27 00:42:48,786] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2021-05-27 00:42:48,801] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2021-05-27 00:42:48,801] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2021-05-27 00:42:49,494] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) [2021-05-27 00:42:49,511] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.GroupCoordinator) [2021-05-27 00:42:49,513] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2021-05-27 00:42:49,516] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.GroupCoordinator) [2021-05-27 00:42:49,517] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2021-05-27 00:42:49,527] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 17 milliseconds. (kafka.coordinator.GroupMetadataManager) [2021-05-27 00:42:49,562] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2021-05-27 00:42:49,563] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2021-05-27 00:42:49,571] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2021-05-27 00:42:49,589] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2021-05-27 00:42:49,594] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2021-05-27 00:42:49,597] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(0.0.0.0,9092,PLAINTEXT) (kafka.utils.ZkUtils) [2021-05-27 00:42:49,613] INFO Kafka version : 0.9.0.0 (org.apache.kafka.common.utils.AppInfoParser) [2021-05-27 00:42:49,613] INFO Kafka commitId : fc7243c2af4b2b4a (org.apache.kafka.common.utils.AppInfoParser) [2021-05-27 00:42:49,614] INFO [Kafka Server 0], started (kafka.server.KafkaServer) [2021-05-27 00:42:49,986] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [my-topic,5],[test,0],[kafkatopic,11],[my-topic,0],[my-topic,7],[kafkatopic,3],[kafkatopic,1],[kafkatopic,6],[kafkatopic,12],[kafkatopic,7],[kafkatopic,4],[kafkatopic,2],[kafka_topic_phrases,0],[kafkatopic,8],[kafka_topic_sentences,0],[my-topic,4],[kafkatopic,9],[my-topic,12],[my-topic,6],[my-topic,11],[my-topic,10],[my-topic,9],[my-topic,2],[my-first-topic,0],[ExampleTopic,0],[kafkatopic,5],[kafkatopic,0],[my-topic,3],[newtopic,0],[my-topic,1],[kafkatopic,10],[my-topic,8] (kafka.server.ReplicaFetcherManager) [2021-05-27 00:42:50,209] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [my-topic,5],[test,0],[kafkatopic,11],[my-topic,0],[my-topic,7],[kafkatopic,3],[kafkatopic,1],[kafkatopic,6],[kafkatopic,12],[kafkatopic,7],[kafkatopic,4],[kafkatopic,2],[kafka_topic_phrases,0],[kafkatopic,8],[kafka_topic_sentences,0],[my-topic,4],[kafkatopic,9],[my-topic,12],[my-topic,6],[my-topic,11],[my-topic,10],[my-topic,9],[my-topic,2],[my-first-topic,0],[ExampleTopic,0],[kafkatopic,5],[kafkatopic,0],[my-topic,3],[newtopic,0],[my-topic,1],[kafkatopic,10],[my-topic,8] (kafka.server.ReplicaFetcherManager)
2.5 Apache Kafka – Storm Api
In Storm, spout and bolts are used for data sources and pipelines respectively. Messages sent to a Kafka topic are consumed by the spout. Bolt takes the data as a stream from Spout. You can have different operations such as executing functions, tuple filters, aggregation of data as a stream, joins, and retrieving from data sources. The topology consists of spouts and bolts. Integration of Kafka and Storm is based on BrokerHosts, Kafka Config,SchemeAsMultiScheme, and SpoutConfig API. ZKHosts and StaticHosts are implementations of BrokerHosts Interface. ZKHosts track Kafka brokers using Zookeeper for storing the details. StaticHosts help in setting the Kafka Brokers and the details. ZkHosts is a better way as it is performant compared to StaticHosts. You can configure the Kafka cluster using KafkaConfig. SpoutConfig is used for storing the Zookeeper infomation. SchemeAsMultiScheme helps in transforming the data (ByteBuffer form) into Storm tuples. KafkaSpout helps in storm integration with Kafka.
2.6 Apache Kafka -Storm Integration Example
In this example, you can see the topology setup with spout and bolt. Let us look at the topology class first.
Apache Kafka Storm Integration – Topology
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;
public class KafkaStormIntegrationExample {
public static void main(String[] args) throws Exception{
Config stormConfig = new Config();
stormConfig.setDebug(true);
stormConfig.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
String zkConnString = "localhost:2181";
String topic = "kafka_topic_phrases";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfiguration = new SpoutConfig (hosts, topic, "/" + topic,
UUID.randomUUID().toString());
spoutConfiguration.bufferSizeBytes = 1024 * 1024 * 4;
spoutConfiguration.fetchSizeBytes = 1024 * 1024 * 4;
spoutConfiguration.forceFromStart = true;
spoutConfiguration.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("kafka-spout", new KafkaSpout(spoutConfiguration));
topologyBuilder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout");
topologyBuilder.setBolt("word-counter", new CountBolt()).shuffleGrouping("word-spitter");
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("KafkaStormSample", stormConfig, topologyBuilder.createTopology());
Thread.sleep(10000);
localCluster.shutdown();
}
}
The code below shows the PhraseSplitBolt which splits the phrases into words.
Apache Kafka Storm Integration – PhraseSplitBolt
import java.util.Map;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class PhraseSplitBolt implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word: words) {
word = word.trim();
if(!word.isEmpty()) {
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public void cleanup() {}
@Override
public Map getComponentConfiguration() {
return null;
}
}
Now let us look at WordCountBolt class.
Apache Kafka Storm Integration – WordCountBolt
import java.util.Map;
import java.util.HashMap;
import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class WordCountBolt implements IRichBolt{
Map counters;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.counters = new HashMap();
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str, 1);
}else {
Integer c = counters.get(str) +1;
counters.put(str, c);
}
collector.ack(input);
}
@Override
public void cleanup() {
for(Map.Entry entry:counters.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map getComponentConfiguration() {
return null;
}
}
On kafka front, You can have a Simple producer sending messages. The source code of the SimpleProducer class is shown below.
Apache Kafka Storm Integration – WordCountBolt
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class SimpleProducerExample {
public static void main(String[] args) throws Exception{
if(args.length == 0){
System.out.println("Enter the topic to be created ");
return;
}
String exampleTopicName = args[0].toString();
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer
(properties);
String[] strings = new String[10];
strings[0] = "hi";
strings[1] = "kafka test";
strings[2] = "storm check";
strings[3] = "spark job";
strings[4] = "message";
strings[5] = "operator";
strings[6] = "modulo";
strings[7] = "remainder";
strings[8] = "backtype";
strings[9] = "utility";
for(int i = 0; i < 10; i++)
{
producer.send(new ProducerRecord(exampleTopicName,strings[i]));
System.out.println("Message sent successfully "+ strings[i]);
Thread.sleep(100);
}
producer.close();
}
}
You can compile the code using the command below
Kafka Simple Producer Compilation Command
javac -cp "$KAFKA_HOME/libs/*" SimpleProducerExample.java
First, let us start with the zookeeper and Kafka server. You can compile all the storm classes using the command below. Ensure that the log4j-over-slf4j-1.6.6.jar is not part of the classpath in the command below. curator-client-2.9.1.jar,curator-framework-2.9.1, and guava-11.0.2.jar can be included in the classpath.
Storm side code -Compilation
apples-MacBook-Air:apache_stom bhagvan.kommadi$ javac -cp "/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/*:/users/bhagvan.kommadi/downloads/apache_stom/lib/*:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/external/storm-kafka/*:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/*:." *.java
Then you can execute the storm side code using the command below
Storm side code – Execution
apples-MacBook-Air:apache_stom bhagvan.kommadi$ javac -cp "/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/*:/users/bhagvan.kommadi/downloads/apache_stom/lib/*:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/external/storm-kafka/*:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/*:." *.java
Now you can start sending the messages to the kafka topic after creating the topic.
Kafka Simple Producer Compilation Command
java -cp "$KAFKA_HOME/libs/*" SimpleProducerExample kafka_topic_phrases
The output of the command executed above is shown below:
Apache Kafka Server Output
apples-MacBook-Air:apachekafkaproducer bhagvan.kommadi$ java -cp "$KAFKA_HOME/libs/*:." SimpleProducerExample kafka_topic_phrases Message sent successfully hi Message sent successfully kafka test Message sent successfully storm check Message sent successfully spark job Message sent successfully message Message sent successfully operator Message sent successfully modulo Message sent successfully remainder Message sent successfully backtype Message sent successfully utility
On the storm side, the output of the executed command is shown below.
Apache Kafka Server Output
apples-MacBook-Air:apache_stom bhagvan.kommadi$ java -cp "/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/*:/users/bhagvan.kommadi/downloads/apache_stom/lib/*:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/external/storm-kafka/*:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/*:." KafkaStormSample
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/bhagvan.kommadi/Desktop/apache-storm-0.9.5/lib/logback-classic-1.0.13.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/bhagvan.kommadi/Downloads/apache_stom/lib/log4j-slf4j-impl-2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/bhagvan.kommadi/Desktop/kafka_2.11-0.9.0.0/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
3345 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
3370 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:host.name=localhost
3370 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:java.version=1.8.0_101
3370 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:java.vendor=Oracle Corporation
3370 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre
3370 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:java.class.path=/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/storm-core-0.9.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clj-stacktrace-0.2.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/disruptor-2.10.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/minlog-1.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/compojure-1.1.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/reflectasm-1.07-shaded.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-core-1.1.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clout-1.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-logging-1.1.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/objenesis-1.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-jetty-adapter-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/slf4j-api-1.7.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jgrapht-core-0.9.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.macro-0.1.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/hiccup-0.3.6.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/servlet-api-2.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/json-simple-1.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/math.numeric-tower-0.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jetty-util-6.1.26.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/carbonite-1.4.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/logback-core-1.0.13.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/chill-java-0.3.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/joda-time-2.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.cli-0.2.4.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-io-2.4.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-codec-1.6.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.logging-0.2.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-servlet-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jetty-6.1.26.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/snakeyaml-1.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-exec-1.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/asm-4.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-lang-2.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/core.incubator-0.1.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jline-2.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/kryo-2.21.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-fileupload-1.2.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-devel-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/logback-classic-1.0.13.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clj-time-0.4.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clojure-1.5.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-jcl-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-1.2-api-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-core-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/curator-client-2.9.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/curator-framework-2.9.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-slf4j-impl-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/guava-11.0.2.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-api-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/kafka-clients-0.9.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/external/storm-kafka/storm-kafka-0.9.5.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-http-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-log4j-appender-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-client-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-core-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/zkclient-0.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-databind-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-media-jaxb-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-api-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-tools-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-guava-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/argparse4j-0.5.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-scaladoc.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-security-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-xml_2.11-1.0.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/slf4j-log4j12-1.7.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-parser-combinators_2.11-1.0.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-javadoc.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-servlet-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-api-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.servlet-api-3.1.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-jaxrs-json-provider-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/log4j-1.2.17.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-module-jaxb-annotations-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/slf4j-api-1.7.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/validation-api-1.1.0.Final.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-sources.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javassist-3.18.1-GA.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-test.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-container-servlet-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.annotation-api-1.2.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-library-2.11.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-json-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/metrics-core-2.2.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-clients-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/lz4-1.2.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-locator-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-io-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/osgi-resource-locator-1.0.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/aopalliance-repackaged-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jopt-simple-3.2.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/snappy-java-1.1.1.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-annotations-2.5.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/zookeeper-3.4.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-util-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-server-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-file-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-runtime-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.ws.rs-api-2.0.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-utils-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-container-servlet-core-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.inject-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-server-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.inject-1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-common-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-jaxrs-base-2.5.4.jar:.
3372 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:java.library.path=/Users/bhagvan.kommadi/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
3372 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/
3373 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:java.compiler=
3373 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:os.name=Mac OS X
3373 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:os.arch=x86_64
3373 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:os.version=10.16
3373 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:user.name=bhagvan.kommadi
3373 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:user.home=/Users/bhagvan.kommadi
3373 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Client environment:user.dir=/Users/bhagvan.kommadi/Downloads/apache_stom
3391 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
3391 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:host.name=localhost
3391 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.version=1.8.0_101
3391 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.vendor=Oracle Corporation
3391 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/Contents/Home/jre
3391 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.class.path=/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/storm-core-0.9.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clj-stacktrace-0.2.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/disruptor-2.10.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/minlog-1.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/compojure-1.1.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/reflectasm-1.07-shaded.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-core-1.1.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clout-1.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-logging-1.1.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/objenesis-1.2.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-jetty-adapter-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/slf4j-api-1.7.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jgrapht-core-0.9.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.macro-0.1.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/hiccup-0.3.6.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/servlet-api-2.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/json-simple-1.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/math.numeric-tower-0.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jetty-util-6.1.26.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/carbonite-1.4.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/logback-core-1.0.13.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/chill-java-0.3.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/joda-time-2.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.cli-0.2.4.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-io-2.4.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-codec-1.6.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/tools.logging-0.2.3.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-servlet-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jetty-6.1.26.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/snakeyaml-1.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-exec-1.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/asm-4.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-lang-2.5.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/core.incubator-0.1.0.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/jline-2.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/kryo-2.21.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/commons-fileupload-1.2.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/ring-devel-0.3.11.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/logback-classic-1.0.13.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clj-time-0.4.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/lib/clojure-1.5.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-jcl-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-1.2-api-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-core-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/curator-client-2.9.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/curator-framework-2.9.1.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-slf4j-impl-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/guava-11.0.2.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/log4j-api-2.3.jar:/users/bhagvan.kommadi/downloads/apache_stom/lib/kafka-clients-0.9.0.1.jar:/users/bhagvan.kommadi/desktop/apache-storm-0.9.5/external/storm-kafka/storm-kafka-0.9.5.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-http-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-log4j-appender-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-client-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-core-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/zkclient-0.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-databind-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-media-jaxb-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-api-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-tools-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-guava-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/argparse4j-0.5.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-scaladoc.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-security-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-xml_2.11-1.0.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/slf4j-log4j12-1.7.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-parser-combinators_2.11-1.0.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-javadoc.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-servlet-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-api-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.servlet-api-3.1.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-jaxrs-json-provider-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/log4j-1.2.17.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-module-jaxb-annotations-2.5.4.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/slf4j-api-1.7.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/validation-api-1.1.0.Final.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-sources.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javassist-3.18.1-GA.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-test.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-container-servlet-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.annotation-api-1.2.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/scala-library-2.11.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-json-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/metrics-core-2.2.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka-clients-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/lz4-1.2.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-locator-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-io-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/osgi-resource-locator-1.0.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/aopalliance-repackaged-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jopt-simple-3.2.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/snappy-java-1.1.1.7.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-annotations-2.5.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/zookeeper-3.4.6.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-util-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jetty-server-9.2.12.v20150709.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-file-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/connect-runtime-0.9.0.0.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.ws.rs-api-2.0.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/hk2-utils-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-container-servlet-core-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.inject-2.4.0-b31.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-server-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/javax.inject-1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jersey-common-2.22.1.jar:/users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/jackson-jaxrs-base-2.5.4.jar:.
3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.library.path=/Users/bhagvan.kommadi/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.io.tmpdir=/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/
3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:java.compiler=
3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:os.name=Mac OS X
3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:os.arch=x86_64
3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:os.version=10.16
3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:user.name=bhagvan.kommadi
3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:user.home=/Users/bhagvan.kommadi
3392 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Server environment:user.dir=/Users/bhagvan.kommadi/Downloads/apache_stom
4499 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/a38c5a4c-6ac0-423b-913e-097a53145a3f/version-2 snapdir /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T/a38c5a4c-6ac0-423b-913e-097a53145a3f/version-2
4532 [main] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - binding to port 0.0.0.0/0.0.0.0:2000
4541 [main] INFO backtype.storm.zookeeper - Starting inprocess zookeeper at port 2000 and dir /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//a38c5a4c-6ac0-423b-913e-097a53145a3f
4817 [main] INFO backtype.storm.daemon.nimbus - Starting Nimbus with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//d9c56f79-8690-4ca4-856f-38179dcd654d", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" [6700 6701 6702 6703], "topology.environment" nil, "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil}
4836 [main] INFO backtype.storm.daemon.nimbus - Using default scheduler
4863 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
4959 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
4962 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@2416498e
4998 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error)
5096 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session
5096 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50490
5111 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50490
5114 [SyncThread:0] INFO org.apache.storm.zookeeper.server.persistence.FileTxnLog - Creating new log file: log.1
5126 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700000 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50490
5127 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e7700000, negotiated timeout = 20000
5130 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
5134 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none
6193 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700000
6194 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700000 closed
6194 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
6196 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50490 which had sessionid 0x179a924e7700000
6197 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
6197 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
6198 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@43f0c2d1
6199 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
6200 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
6200 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50491
6200 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50491
6202 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700001 with negotiated timeout 20000 for client /127.0.0.1:50491
6202 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e7700001, negotiated timeout = 20000
6202 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
6239 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
6240 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
6240 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@23592946
6242 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
6243 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
6243 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50492
6243 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50492
6245 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700002 with negotiated timeout 20000 for client /127.0.0.1:50492
6245 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e7700002, negotiated timeout = 20000
6245 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
6246 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none
7255 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700002
7257 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700002 closed
7257 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
7257 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
7258 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50492 which had sessionid 0x179a924e7700002
7258 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
7258 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@5c60b0a0
7260 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
7261 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error)
7261 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
7261 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session
7261 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50493
7262 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50493
7262 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@23f3dbf0
7264 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error)
7264 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700003 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50493
7264 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e7700003, negotiated timeout = 20000
7264 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
7264 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session
7264 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50494
7265 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50494
7266 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700004 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50494
7266 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e7700004, negotiated timeout = 20000
7267 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
7267 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none
7269 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700004
7270 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700004 closed
7284 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
7284 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
7284 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
7286 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@31ff6309
7286 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50494 which had sessionid 0x179a924e7700004
7289 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
7290 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
7290 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50495
7291 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50495
7296 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700005 with negotiated timeout 20000 for client /127.0.0.1:50495
7296 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e7700005, negotiated timeout = 20000
7297 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
7318 [main] INFO backtype.storm.daemon.supervisor - Starting Supervisor with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//4a2ac468-1fa6-4abd-9863-0b9babd83b88", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1024 1025 1026), "topology.environment" nil, "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil}
7345 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
7345 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
7346 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@46731692
7348 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error)
7349 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session
7349 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50496
7349 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50496
7351 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700006 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50496
7351 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e7700006, negotiated timeout = 20000
7354 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
7354 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none
7359 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700006
7360 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700006 closed
7360 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
7360 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
7361 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
7361 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50496 which had sessionid 0x179a924e7700006
7362 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@ad9e63e
7364 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
7365 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
7365 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50497
7365 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50497
7367 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700007 with negotiated timeout 20000 for client /127.0.0.1:50497
7367 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e7700007, negotiated timeout = 20000
7367 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
7404 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor with id bb3e0fa6-6811-4f10-ba4e-34f018909fa6 at host localhost
7407 [main] INFO backtype.storm.daemon.supervisor - Starting Supervisor with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//5cdefbab-dbe6-4d9a-b5f9-a06eb2e6d0df", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1027 1028 1029), "topology.environment" nil, "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil}
7410 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
7411 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
7411 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@b0fc838
7413 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error)
7413 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session
7413 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50498
7414 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50498
7415 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700008 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50498
7415 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e7700008, negotiated timeout = 20000
7415 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
7415 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none
7418 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700008
7419 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700008 closed
7420 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
7420 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50498 which had sessionid 0x179a924e7700008
7420 [main] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
7422 [main] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
7422 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@5fbdc49b
7424 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
7425 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
7425 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50499
7425 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50499
7426 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e7700009 with negotiated timeout 20000 for client /127.0.0.1:50499
7427 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e7700009, negotiated timeout = 20000
7427 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
7436 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor with id 563a836c-55ba-4b7e-9f4c-beaf44aea5a3 at host localhost
7508 [main] INFO backtype.storm.daemon.nimbus - Received topology submission for KafkaStormSample with conf {"topology.max.task.parallelism" nil, "topology.acker.executors" nil, "topology.kryo.register" nil, "topology.kryo.decorators" (), "topology.name" "KafkaStormSample", "storm.id" "KafkaStormSample-1-1622040441", "topology.debug" true, "topology.max.spout.pending" 1}
7540 [main] INFO backtype.storm.daemon.nimbus - Activating KafkaStormSample: KafkaStormSample-1-1622040441
7650 [main] INFO backtype.storm.scheduler.EvenScheduler - Available slots: (["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1027] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1028] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1029] ["bb3e0fa6-6811-4f10-ba4e-34f018909fa6" 1024] ["bb3e0fa6-6811-4f10-ba4e-34f018909fa6" 1025] ["bb3e0fa6-6811-4f10-ba4e-34f018909fa6" 1026])
7680 [main] INFO backtype.storm.daemon.nimbus - Setting new assignment for topology id KafkaStormSample-1-1622040441: #backtype.storm.daemon.common.Assignment{:master-code-dir "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//d9c56f79-8690-4ca4-856f-38179dcd654d/nimbus/stormdist/KafkaStormSample-1-1622040441", :node->host {"563a836c-55ba-4b7e-9f4c-beaf44aea5a3" "localhost"}, :executor->node+port {[4 4] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1027], [3 3] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1027], [2 2] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1027], [1 1] ["563a836c-55ba-4b7e-9f4c-beaf44aea5a3" 1027]}, :executor->start-time-secs {[4 4] 1622040441, [3 3] 1622040441, [2 2] 1622040441, [1 1] 1622040441}}
8921 [Thread-5] INFO backtype.storm.daemon.supervisor - Extracting resources from jar at /users/bhagvan.kommadi/desktop/kafka_2.11-0.9.0.0/libs/kafka_2.11-0.9.0.0-javadoc.jar to /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//5cdefbab-dbe6-4d9a-b5f9-a06eb2e6d0df/supervisor/stormdist/KafkaStormSample-1-1622040441/resources
8962 [Thread-6] INFO backtype.storm.daemon.supervisor - Launching worker with assignment #backtype.storm.daemon.supervisor.LocalAssignment{:storm-id "KafkaStormSample-1-1622040441", :executors ([4 4] [3 3] [2 2] [1 1])} for this supervisor 563a836c-55ba-4b7e-9f4c-beaf44aea5a3 on port 1027 with id 39344513-9c18-41e7-bf9b-fd894180922f
8969 [Thread-6] INFO backtype.storm.daemon.worker - Launching worker for KafkaStormSample-1-1622040441 on 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:1027 with id 39344513-9c18-41e7-bf9b-fd894180922f and conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//5cdefbab-dbe6-4d9a-b5f9-a06eb2e6d0df", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1027 1028 1029), "topology.environment" nil, "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil}
8970 [Thread-6] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
8970 [Thread-6] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
8972 [Thread-6] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@559169a4
8978 [Thread-6-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
8979 [Thread-6-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
8979 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:50500
8979 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:50500
8981 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e770000a with negotiated timeout 20000 for client /127.0.0.1:50500
8981 [Thread-6-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x179a924e770000a, negotiated timeout = 20000
8982 [Thread-6-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
8982 [Thread-6-EventThread] INFO backtype.storm.zookeeper - Zookeeper state update: :connected:none
8985 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e770000a
8987 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50500 which had sessionid 0x179a924e770000a
8987 [Thread-6] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e770000a closed
8987 [Thread-6-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
8987 [Thread-6] INFO backtype.storm.utils.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [5]
8989 [Thread-6] INFO org.apache.storm.curator.framework.imps.CuratorFrameworkImpl - Starting
8989 [Thread-6] INFO org.apache.storm.zookeeper.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.curator.ConnectionState@2a7cbfce
8992 [Thread-6-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error)
8992 [Thread-6-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session
8992 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50501
8993 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50501
8994 [Thread-6-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x179a924e770000b, negotiated timeout = 20000
8994 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e770000b with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50501
8995 [Thread-6-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
9004 [Thread-6] INFO backtype.storm.daemon.worker - Reading Assignments.
9074 [Thread-6] INFO backtype.storm.daemon.worker - Launching receive-thread for 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:1027
9083 [Thread-7-worker-receiver-thread-0] INFO backtype.storm.messaging.loader - Starting receive-thread: [stormId: KafkaStormSample-1-1622040441, port: 1027, thread-id: 0 ]
9410 [Thread-6] INFO backtype.storm.daemon.executor - Loading executor kafka-spout:[2 2]
9429 [Thread-6] INFO backtype.storm.daemon.task - Emitting: kafka-spout __system ["startup"]
9429 [Thread-6] INFO backtype.storm.daemon.executor - Loaded executor tasks kafka-spout:[2 2]
9443 [Thread-6] INFO backtype.storm.daemon.executor - Finished loading executor kafka-spout:[2 2]
9459 [Thread-6] INFO backtype.storm.daemon.executor - Loading executor word-counter:[3 3]
9460 [Thread-6] INFO backtype.storm.daemon.task - Emitting: word-counter __system ["startup"]
9461 [Thread-6] INFO backtype.storm.daemon.executor - Loaded executor tasks word-counter:[3 3]
9469 [Thread-6] INFO backtype.storm.daemon.executor - Finished loading executor word-counter:[3 3]
9479 [Thread-6] INFO backtype.storm.daemon.executor - Loading executor word-spitter:[4 4]
9480 [Thread-6] INFO backtype.storm.daemon.task - Emitting: word-spitter __system ["startup"]
9481 [Thread-6] INFO backtype.storm.daemon.executor - Loaded executor tasks word-spitter:[4 4]
9483 [Thread-6] INFO backtype.storm.daemon.executor - Finished loading executor word-spitter:[4 4]
9491 [Thread-6] INFO backtype.storm.daemon.executor - Loading executor __system:[-1 -1]
9492 [Thread-6] INFO backtype.storm.daemon.task - Emitting: __system __system ["startup"]
9492 [Thread-6] INFO backtype.storm.daemon.executor - Loaded executor tasks __system:[-1 -1]
9495 [Thread-6] INFO backtype.storm.daemon.executor - Finished loading executor __system:[-1 -1]
9502 [Thread-6] INFO backtype.storm.daemon.executor - Loading executor __acker:[1 1]
9506 [Thread-6] INFO backtype.storm.daemon.task - Emitting: __acker __system ["startup"]
9506 [Thread-6] INFO backtype.storm.daemon.executor - Loaded executor tasks __acker:[1 1]
9509 [Thread-6] INFO backtype.storm.daemon.executor - Timeouts disabled for executor __acker:[1 1]
9509 [Thread-6] INFO backtype.storm.daemon.executor - Finished loading executor __acker:[1 1]
9518 [Thread-6] INFO backtype.storm.daemon.worker - Worker has topology config {"storm.id" "KafkaStormSample-1-1622040441", "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//5cdefbab-dbe6-4d9a-b5f9-a06eb2e6d0df", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.kryo.decorators" (), "topology.name" "KafkaStormSample", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" 1, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1027 1028 1029), "topology.environment" nil, "topology.debug" true, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" nil, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil}
9520 [Thread-6] INFO backtype.storm.daemon.worker - Worker 39344513-9c18-41e7-bf9b-fd894180922f for storm KafkaStormSample-1-1622040441 on 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:1027 has finished loading
10039 [refresh-active-timer] INFO backtype.storm.daemon.worker - All connections are ready for worker 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:1027 with id 39344513-9c18-41e7-bf9b-fd894180922f
10062 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Opening spout kafka-spout:(2)
10087 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Preparing bolt word-counter:(3)
10095 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Prepared bolt word-counter:(3)
10107 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Preparing bolt word-spitter:(4)
10108 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Prepared bolt word-spitter:(4)
10113 [Thread-15-__system] INFO backtype.storm.daemon.executor - Preparing bolt __system:(-1)
10116 [Thread-15-__system] INFO backtype.storm.daemon.executor - Prepared bolt __system:(-1)
10132 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Preparing bolt __acker:(1)
10135 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Prepared bolt __acker:(1)
10197 [Thread-9-kafka-spout] INFO org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
10220 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:50502
10222 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:50502
10223 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x179a924e770000c with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:50502
10230 [Thread-9-kafka-spout-EventThread] INFO org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
10232 [Thread-9-kafka-spout] INFO org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
10245 [Thread-9-kafka-spout-EventThread] INFO org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
10283 [Thread-9-kafka-spout] INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=0.0.0.0:9092}}
10284 [Thread-9-kafka-spout] INFO org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
10288 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Opened spout kafka-spout:(2)
10293 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Activating spout kafka-spout:(2)
10293 [Thread-9-kafka-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager connections
10297 [Thread-9-kafka-spout-EventThread] INFO org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
10301 [Thread-9-kafka-spout] INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=0.0.0.0:9092}}
10303 [Thread-9-kafka-spout] INFO storm.kafka.KafkaUtils - Task [1/1] assigned [Partition{host=0.0.0.0:9092, partition=0}]
10303 [Thread-9-kafka-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: []
10303 [Thread-9-kafka-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [Partition{host=0.0.0.0:9092, partition=0}]
10526 [Thread-9-kafka-spout] INFO storm.kafka.PartitionManager - Read partition information from: /kafka_topic_phrases/8b2a044f-29f9-483a-8f2d-3c868d6434db/partition_0 --> null
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
10836 [Thread-9-kafka-spout] INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset
10836 [Thread-9-kafka-spout] INFO storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
10838 [Thread-9-kafka-spout] INFO storm.kafka.PartitionManager - Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2
10839 [Thread-9-kafka-spout] INFO storm.kafka.PartitionManager - Starting Kafka 0.0.0.0:0 from offset 0
10843 [Thread-9-kafka-spout] INFO storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing
10943 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [hi]
10945 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {-9145905182196884541=-200665344721218530}, [hi]
10946 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [hi]
10946 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [hi]
10946 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [-9145905182196884541 -200665344721218530]
10946 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [-9145905182196884541 -200665344721218530]
10948 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [-9145905182196884541 -200665344721218530 2]
10950 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [-9145905182196884541 -200665344721218530 2]
10951 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [-9145905182196884541]
10954 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [-9145905182196884541]
10955 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@749098b6
10955 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [kafka test]
10956 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [3361071049515033607 -3553068181479967910 2]
10956 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {3361071049515033607=-3553068181479967910}, [kafka test]
10956 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [kafka]
10956 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [3361071049515033607 -3553068181479967910 2]
10956 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [test]
10956 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [3361071049515033607 -3553068181479967910]
10956 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [kafka]
10956 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [test]
10958 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [3361071049515033607 -3553068181479967910]
10958 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [3361071049515033607]
10958 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [3361071049515033607]
10959 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@2c88dd7e
10959 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [storm check]
10959 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [5656882679628994049 -2821302035063205381 2]
10959 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {5656882679628994049=-2821302035063205381}, [storm check]
10960 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [storm]
10960 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [check]
10960 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [5656882679628994049 -2821302035063205381]
10960 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [storm]
10960 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [check]
10960 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [5656882679628994049 -2821302035063205381]
10961 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [5656882679628994049 -2821302035063205381 2]
10961 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [5656882679628994049]
10962 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [5656882679628994049]
10963 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@55ffcecb
10963 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [spark job]
10963 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {7133074069827401185=7517764075602487290}, [spark job]
10963 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [spark]
10963 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [job]
10964 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [spark]
10964 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [7133074069827401185 7517764075602487290]
10964 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [job]
10964 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [7133074069827401185 7517764075602487290]
10964 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [7133074069827401185 7517764075602487290 2]
10964 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [7133074069827401185 7517764075602487290 2]
10965 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [7133074069827401185]
10966 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [7133074069827401185]
10967 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@79baf4fe
10967 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [message]
10969 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {6343398862224861484=-4414870605500453828}, [message]
10969 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [6343398862224861484 -4414870605500453828 2]
10970 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [message]
10970 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [6343398862224861484 -4414870605500453828]
10970 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [6343398862224861484 -4414870605500453828 2]
10970 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [message]
10972 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [6343398862224861484 -4414870605500453828]
10972 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [6343398862224861484]
10974 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [6343398862224861484]
10974 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@135f554d
10974 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [operator]
10974 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {-3129322591382918852=-740436277297000921}, [operator]
10974 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [operator]
10974 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [-3129322591382918852 -740436277297000921 2]
10975 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [-3129322591382918852 -740436277297000921]
10975 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [operator]
10975 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [-3129322591382918852 -740436277297000921]
10975 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [-3129322591382918852 -740436277297000921 2]
10975 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [-3129322591382918852]
10977 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [-3129322591382918852]
10977 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@46556960
10977 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [modulo]
10978 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {3640793944618690795=8106108915378369655}, [modulo]
10978 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [modulo]
10978 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [3640793944618690795 8106108915378369655]
10978 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [modulo]
10978 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [3640793944618690795 8106108915378369655]
10979 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [3640793944618690795 8106108915378369655 2]
10979 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [3640793944618690795 8106108915378369655 2]
10979 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [3640793944618690795]
10979 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [3640793944618690795]
10979 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@28df1f28
10980 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [remainder]
10980 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {2596062899289317771=2634515203207894354}, [remainder]
10980 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [remainder]
10980 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [2596062899289317771 2634515203207894354]
10980 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [remainder]
10981 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [2596062899289317771 2634515203207894354]
10981 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [2596062899289317771 2634515203207894354 2]
10981 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [2596062899289317771 2634515203207894354 2]
10981 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [2596062899289317771]
10983 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [2596062899289317771]
10983 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@70d7b03d
10983 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [backtype]
10984 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [2514155816374762264 -1773960649746285671 2]
10984 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {2514155816374762264=-1773960649746285671}, [backtype]
10984 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [backtype]
10984 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [2514155816374762264 -1773960649746285671]
10984 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [2514155816374762264 -1773960649746285671 2]
10984 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [backtype]
10984 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [2514155816374762264 -1773960649746285671]
10984 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [2514155816374762264]
10986 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [2514155816374762264]
10986 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@194b6657
10986 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout default [utility]
10987 [Thread-13-word-spitter] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: default, id: {2895993183130256304=2076802810368494976}, [utility]
10987 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter default [utility]
10987 [Thread-13-word-spitter] INFO backtype.storm.daemon.task - Emitting: word-spitter __ack_ack [2895993183130256304 2076802810368494976]
10987 [Thread-11-word-counter] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: default, id: {}, [utility]
10987 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: word-spitter:4, stream: __ack_ack, id: {}, [2895993183130256304 2076802810368494976]
10989 [Thread-9-kafka-spout] INFO backtype.storm.daemon.task - Emitting: kafka-spout __ack_init [2895993183130256304 2076802810368494976 2]
10990 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing received message source: kafka-spout:2, stream: __ack_init, id: {}, [2895993183130256304 2076802810368494976 2]
10990 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting direct: 2; __acker __ack_ack [2895993183130256304]
10991 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Processing received message source: __acker:1, stream: __ack_ack, id: {}, [2895993183130256304]
10992 [Thread-9-kafka-spout] INFO backtype.storm.daemon.executor - Acking message storm.kafka.PartitionManager$KafkaMessageId@74380277
12988 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x179a924e770000c type:create cxid:0x3 zxid:0x22 txntype:-1 reqpath:n/a Error Path:/kafka_topic_phrases/8b2a044f-29f9-483a-8f2d-3c868d6434db Error:KeeperErrorCode = NoNode for /kafka_topic_phrases/8b2a044f-29f9-483a-8f2d-3c868d6434db
17697 [main] INFO backtype.storm.daemon.nimbus - Shutting down master
17699 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700001
17700 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700001 closed
17700 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
17700 [main] INFO backtype.storm.daemon.nimbus - Shut down master
17700 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50491 which had sessionid 0x179a924e7700001
17703 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700003
17704 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50493 which had sessionid 0x179a924e7700003
17704 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700003 closed
17705 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
17706 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700005
17707 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700005 closed
17707 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50495 which had sessionid 0x179a924e7700005
17707 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
17708 [main] INFO backtype.storm.daemon.supervisor - Shutting down supervisor bb3e0fa6-6811-4f10-ba4e-34f018909fa6
17709 [Thread-3] INFO backtype.storm.event - Event manager interrupted
17710 [Thread-4] INFO backtype.storm.event - Event manager interrupted
17712 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700007
17712 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700007 closed
17713 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
17713 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50497 which had sessionid 0x179a924e7700007
17714 [main] INFO backtype.storm.daemon.supervisor - Shutting down 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:39344513-9c18-41e7-bf9b-fd894180922f
17714 [main] INFO backtype.storm.process-simulator - Killing process 9e419fa3-7aeb-4b37-982a-7ff15ea211fb
17715 [main] INFO backtype.storm.daemon.worker - Shutting down worker KafkaStormSample-1-1622040441 563a836c-55ba-4b7e-9f4c-beaf44aea5a3 1027
17715 [main] INFO backtype.storm.daemon.worker - Shutting down receive thread
17715 [main] INFO backtype.storm.messaging.loader - Shutting down receiving-thread: [KafkaStormSample-1-1622040441, 1027]
17715 [main] INFO backtype.storm.messaging.loader - Waiting for receiving-thread:[KafkaStormSample-1-1622040441, 1027] to die
17716 [Thread-7-worker-receiver-thread-0] INFO backtype.storm.messaging.loader - Receiving-thread:[KafkaStormSample-1-1622040441, 1027] received shutdown notice
17717 [main] INFO backtype.storm.messaging.loader - Shutdown receiving-thread: [KafkaStormSample-1-1622040441, 1027]
17717 [main] INFO backtype.storm.daemon.worker - Shut down receive thread
17717 [main] INFO backtype.storm.daemon.worker - Terminating messaging context
17717 [main] INFO backtype.storm.daemon.worker - Shutting down executors
17718 [main] INFO backtype.storm.daemon.executor - Shutting down executor kafka-spout:[2 2]
17719 [Thread-9-kafka-spout] INFO backtype.storm.util - Async loop interrupted!
17719 [Thread-8-disruptor-executor[2 2]-send-queue] INFO backtype.storm.util - Async loop interrupted!
17725 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e770000c
17726 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50502 which had sessionid 0x179a924e770000c
17727 [main] INFO backtype.storm.daemon.executor - Shut down executor kafka-spout:[2 2]
17727 [main] INFO backtype.storm.daemon.executor - Shutting down executor word-counter:[3 3]
17728 [Thread-11-word-counter] INFO backtype.storm.util - Async loop interrupted!
17728 [Thread-10-disruptor-executor[3 3]-send-queue] INFO backtype.storm.util - Async loop interrupted!
hi : 1
storm : 1
test : 1
utility : 1
check : 1
message : 1
operator : 1
spark : 1
kafka : 1
backtype : 1
job : 1
modulo : 1
remainder : 1
17729 [main] INFO backtype.storm.daemon.executor - Shut down executor word-counter:[3 3]
17729 [main] INFO backtype.storm.daemon.executor - Shutting down executor word-spitter:[4 4]
17730 [Thread-13-word-spitter] INFO backtype.storm.util - Async loop interrupted!
17730 [Thread-12-disruptor-executor[4 4]-send-queue] INFO backtype.storm.util - Async loop interrupted!
17731 [main] INFO backtype.storm.daemon.executor - Shut down executor word-spitter:[4 4]
17732 [main] INFO backtype.storm.daemon.executor - Shutting down executor __system:[-1 -1]
17733 [Thread-15-__system] INFO backtype.storm.util - Async loop interrupted!
17734 [Thread-14-disruptor-executor[-1 -1]-send-queue] INFO backtype.storm.util - Async loop interrupted!
17736 [main] INFO backtype.storm.daemon.executor - Shut down executor __system:[-1 -1]
17736 [main] INFO backtype.storm.daemon.executor - Shutting down executor __acker:[1 1]
17736 [Thread-17-__acker] INFO backtype.storm.util - Async loop interrupted!
17737 [Thread-16-disruptor-executor[1 1]-send-queue] INFO backtype.storm.util - Async loop interrupted!
17737 [main] INFO backtype.storm.daemon.executor - Shut down executor __acker:[1 1]
17737 [main] INFO backtype.storm.daemon.worker - Shut down executors
17738 [main] INFO backtype.storm.daemon.worker - Shutting down transfer thread
17738 [Thread-18-disruptor-worker-transfer-queue] INFO backtype.storm.util - Async loop interrupted!
17740 [main] INFO backtype.storm.daemon.worker - Shut down transfer thread
17748 [main] INFO backtype.storm.daemon.worker - Shutting down default resources
17748 [main] INFO backtype.storm.daemon.worker - Shut down default resources
17762 [main] INFO backtype.storm.daemon.worker - Disconnecting from storm cluster state context
17763 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e770000b
17763 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /0:0:0:0:0:0:0:1:50501 which had sessionid 0x179a924e770000b
17764 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e770000b closed
17764 [Thread-6-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
17764 [main] INFO backtype.storm.daemon.worker - Shut down worker KafkaStormSample-1-1622040441 563a836c-55ba-4b7e-9f4c-beaf44aea5a3 1027
17772 [main] INFO backtype.storm.daemon.supervisor - Shut down 563a836c-55ba-4b7e-9f4c-beaf44aea5a3:39344513-9c18-41e7-bf9b-fd894180922f
17773 [main] INFO backtype.storm.daemon.supervisor - Shutting down supervisor 563a836c-55ba-4b7e-9f4c-beaf44aea5a3
17778 [Thread-5] INFO backtype.storm.event - Event manager interrupted
17779 [Thread-6] INFO backtype.storm.event - Event manager interrupted
17780 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x179a924e7700009
17780 [main] INFO org.apache.storm.zookeeper.ZooKeeper - Session: 0x179a924e7700009 closed
17781 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:50499 which had sessionid 0x179a924e7700009
17781 [main-EventThread] INFO org.apache.storm.zookeeper.ClientCnxn - EventThread shut down
17781 [main] INFO backtype.storm.testing - Shutting down in process zookeeper
17782 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO org.apache.storm.zookeeper.server.NIOServerCnxnFactory - NIOServerCnxn factory exited run method
17782 [main] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - shutting down
17782 [main] INFO org.apache.storm.zookeeper.server.SessionTrackerImpl - Shutting down
17782 [main] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Shutting down
17782 [main] INFO org.apache.storm.zookeeper.server.SyncRequestProcessor - Shutting down
17782 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - PrepRequestProcessor exited loop!
17782 [SyncThread:0] INFO org.apache.storm.zookeeper.server.SyncRequestProcessor - SyncRequestProcessor exited!
17783 [main] INFO org.apache.storm.zookeeper.server.FinalRequestProcessor - shutdown of request processor complete
17783 [main] INFO backtype.storm.testing - Done shutting down in process zookeeper
17783 [main] INFO backtype.storm.testing - Deleting temporary path /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//d9c56f79-8690-4ca4-856f-38179dcd654d
17789 [main] INFO backtype.storm.testing - Deleting temporary path /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//a38c5a4c-6ac0-423b-913e-097a53145a3f
17791 [main] INFO backtype.storm.testing - Deleting temporary path /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//4a2ac468-1fa6-4abd-9863-0b9babd83b88
17797 [main] INFO backtype.storm.testing - Deleting temporary path /var/folders/cr/0y892lq14qv7r24yl0gh0_dm0000gp/T//5cdefbab-dbe6-4d9a-b5f9-a06eb2e6d0df
17916 [SessionTracker] INFO org.apache.storm.zookeeper.server.SessionTrackerImpl - SessionTrackerImpl exited loop!
You can see the snippet from the output to zoom into the results of the spout and the bolts of storm for the data sent to kafka topic.
Output from the Storm side code
hi : 1 storm : 1 test : 1 utility : 1 check : 1 message : 1 operator : 1 spark : 1 kafka : 1 backtype : 1 job : 1 modulo : 1 remainder : 1
3. Download the Source Code
You can download the full source code of this example here: Apache Kafka Integration With Storm



