FAQ Database Discussion Community


Sent different tuples from 1 spout to different bolt in Apache Storm

java,storm
Is it possible to sent different tuples from 1 spout to different bolt in Apache Storm? For instance, I had Spout A, which need to sent out Tuple B to Bolt C and Tuple D to Bolt E. How should I implement it using spout in Java? I mean how...

Storm Bolt not printing/logging Kafka Spout

java,storm,apache-kafka
Edit: I added an .ack() to the Bolt (which required me to use a Rich Bolt instead of the basic bolt) and am having the same issue - nothing that tells me tuples are being processed by the bolt. If it matters, I'm running this on a CentOS image on...

Is there any way to ACK tuples in a part of Storm bolts

storm
As it's inefficiently to ack all messages in Storm, among the whole components of my topology, only some of them needs to guarantee message processing, and I'd like to know is there a proper way to do this. For instance, I have a TimingBolt which takes tick tuple to make...

Print from Apache Storm Bolt

java,storm
I'm working my way through the example code of some Storm topologies and bolts, but I'm running into something weird. My goal is to set up Kafka with Storm, so that Storm can process the messages available on the Kafka bus. I have the following bolt defined: public class ReportBolt...

Storm topology performance hit when acking

storm
I'm using this tool from yahoo to run some performance tests on my storm cluster - https://github.com/yahoo/storm-perf-test I notice that there's almost a 10x performance hit I get when I turn acking on. Here's some details to reproduce the test - Cluster - 3 supervisor nodes and 1 nimbus node....

Storm vs Kafka and Processors

storm,apache-kafka
I am finding it unclear reading the storm documentation exactly what apache storm gives me over having small processes that consume from a kafka topic and produce to another topic? With this architecture I can change the number of each of these worker processes to suit which components are slower...

Apache Storm Installation without ZeroMQ/JZMQ

apache,storm
I am trying to setup a multi-cluster storm system. I have found several 3rd party step by step guides on this. They all have Java, Python, ZeroMQ 2.1.7 and JZMQ as the requirements for the Nimbus and Supervisor/Slave nodes. But on the official Apache Storm website, the only requirements for...

How to export data from Cassandra to mongodb?

java,mongodb,cassandra,export,storm
I am using Apache (Kafka-Storm-Cassandra) for real time processing.The problem I am facing is that I can't use aggregation queries on Cassandra directly(Datastax can be used but it is a paid service).Moreover, I also considered using mongodb but It is not good for more and frequent writes. So, I am...

Using Apache Camel ProducerTemplate in Apache Storm bolt

java,apache-camel,storm
I'm trying to write simple Storm + Camel project. My Storm topology analyzes tweets and one bolt should send tweet text to apache camel route, which in turn is using websocket to notify some webapp. I cannot make it work due to NotSerializableExceptions received from bolts when trying to use...

Storm Topology generating an exception

hadoop,storm
I am trying t send my data to hbase from kafka topic using storm spout acting as an kafka consumer and sending the data to hbase I am facing the exception in the storm topology.... java.lang.RuntimeException: java.lang.RuntimeException: No leader found for partition 0 at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) at...

Apache Storm - spout and bolts not present in Storm UI

storm,apache-kafka
I am developing a storm topology locally. I am using the Storm 0.9.2-incubating and have developed a simple Topology. When I deploy it using the LocalCluster() option, it works fine, but it will not show up in my Storm UI it just executes. When I deploy it regularly, it will...

storm - how to choose a stream grouping

storm
I'm using the KafkaSpout to read / stream message of compressed Byte[]. The bolts are simple: uncompress the message -> write to Cassandra. I'm wondering which Stream Grouping to use. The samples appear to mainly use the Shuffle Grouping. In testing I've been using the All Grouping (figuring that I...

Data transformations on-the-fly

mapping,apache-spark,storm
Is there a way, other than a manual mapping, to translate related values on-the-fly? I know this sounds vague but what I am looking for is a way to take an input value of say "2015 Ford" and translate it given to a mapping provided by a client that indicates...

storm + kafka: understanding ack, fail and latency

java,storm
Im using the kafkaspout to consume from 2 kafka topics each of which has 6 partitions. The spout goes to a single bolt to unpack the relevant bytes and then to a second bolt for further processing. When I look at the storm-ui the numbers aren't making much sense and...

Turn off acking in KafkaSpout while respecting maxSpoutPending

storm,apache-kafka
I'm using storm 0.9.3. I'm trying to turn off acking per tuple for my topology. I set Config.TOPOLOGY_ACKER_EXECUTORS to 0, and maxSpoutPending to 500. When I run my topology, I'm noticing that maxSpoutPending is being ignored and the spout continues emitting well past that limit. Here's my config - config.setNumWorkers(3);...

Error while submiting topology to storm clustre

java,storm,word-count,nimbus,topology
I am running a storm topology .This is the basic wordcount topology.I am using text file as the source and storm for processing the data.While submitting the i am facing these issues.I am very new to storm.Please suggest me the changes i need to do in the following code.Thanks in...

Storm: when to use setNumTasks?

storm
I'm curious about the circumstances that would necessitate the use of the setNumTasks function. The docs say that the default is one task for each executor. If I have an 'expensive' db task(calls to external dbs that take time) to run in a bolt with 'fast' tasks on either side...

Unable to use remote slave node in Apache Storm cluster

storm,supervisor
I'm following http://jayatiatblogs.blogspot.com/2011/11/storm-installation.html to try configuring Apache Storm remote cluster using few virtual machine (EC2) with Ubuntu 14.04 LTS on Amazon Web Services. My master node is 10.0.0.230, my slave node is 10.0.0.79. My zookeeper reside in my master node. When I use storm jar storm-starter-0.9.4-jar-with-dependencies.jar storm.starter.RollingTopWords production-topology remote in...

Client serialization issue: read from Couchbase in Trident function

serialization,storm,trident
Basically, I want to read some additional attributes from Couchbase with attributes from the tuple, for example, the tuple has input fields "{a, b, c}", I want to emit a tuple "{a, b, c, d, e}" where "d" and "e" are read from Couchbase with "a" as the key. However,...

What happens if kafka spout is restarted by storm trident?

transactions,storm,apache-kafka,trident
Say the kafka spout fetched some messages and at that time the spout task is restarted. Will the fetched messages be lost? I'm a starter on trident and my question is for the trident kafka transactional/opaque spout. Thanks in advance!...

Ubuntu Apache Storm jar Error: Could not find or load main class storm.starter

apache,jar,storm
I was following the https://github.com/apache/storm/tree/master/examples/storm-starter . I'm using Ubuntu 14.04 LTS on my VMWare. I can run mvn exec:java -D storm.topology=storm.starter.RollingTopWords in /home/user/storm/examples/storm-starter directory smoothly without problem. However I can't run it in Apache Storm. I had storm-starter-topologies-0.9.3.jar in my /home/user/storm/examples/storm-starter directory. I also had storm-starter-0.9.3.jar & storm-starter-0.9.3-jar-with-dependencies.jar in my...

Data ingestion with Apache Storm

apache,storm,apache-kafka,flume
I have been reading a lot of articles where implementations of Apache Storm are explained for ingesting data from either Apache Flume or Apache Kafka. My main question remains unanswered after reading several articles. What is the main benefit of using Apache Kafka or Apache Flume? Why not collecting data...

ShellBolt - Anchored onto “” after ack/fail

java,storm
I have a problem with my shellbolt, which uses a cpp bolt with the multilang module. This one dies because the shellBolt's attribute "_inputs" is empty and call the next runtime exception : "Anchored onto #Anchor after ack/fail" . This is the error: ERROR backtype.storm.task.ShellBolt - Halting process: ShellBolt died....

Storm Error when launching multilang subprocess

java,c++,storm
I have a storm error when i use the cpp wrapper for storm (StormCpp) who use the ShellBolt's multilang subprocess and this one gives me the next error: 10784 [Thread-17-split] ERROR backtype.storm.util - Async loop died! java.lang.RuntimeException: Error when launching multilang subprocess This is my Topology Java Code: public class...

Why is my streamparse topology definition complaining about a wrong number of arguments to thrift$mk-topology?

python,clojure,storm
I'm trying to get a very simple streamparse (i.e. Apache Storm) spout working, but I'm getting the error below when running sparse run -t 120: Caught exception: Wrong number of args (1) passed to: thrift$mk-topology clojure.lang.ArityException: Wrong number of args (1) passed to: thrift$mk-topology at clojure.lang.AFn.throwArity (AFn.java:437) clojure.lang.AFn.invoke (AFn.java:39) clojure.lang.AFn.applyToHelper...

Two part: How to run 'ls' from a java program and how to tell computers on a storm cluster to execute specific commands

java,linux,storm,topology
So writing a test storm topology with minimal java experience, so i'm figuring things out in a brute force way. My experience writing storm topologies is also minimal. I have three supervisor nodes on my cluster and want each of them to run ls in the terminal, funnel the output...

Error in storm-starter: Could not find artifact 0.10.0-SNAPSHOT in clojars

storm
I cloned storm-starter from https://github.com/apache/storm/tree/master/examples/storm-starter, then went to build my jar locally using "mvn clean install -DskipTests=true" Here's the error I got: [ERROR] Failed to execute goal on project storm-starter: Could not resolve depen dencies for project org.apache.storm:storm-starter:jar:0.10.0-SNAPSHOT: Could no t find artifact org.apache.storm:storm-core:jar:0.10.0-SNAPSHOT in clojars (http s://clojars.org/repo/) -> [Help...

ERROR backtype.storm.util - Async loop died! java.lang.RuntimeException: java.lang.NullPointerException

java,mongodb,mongodb-query,storm,apache-kafka
I am trying to write the output of the Storm word count program to mongoDB.This is the error i am getting when I am executing the program.Though I am able to successfully print the reusult.Problem starts when i am trying to write the output. The error generated : Jun 01,...

Storm-kafka spout failing

storm,apache-kafka
i am using storm0.9.4 with storm-kafka:0.9.0-wip16a-scala292 as the dependency to read from kafka 0.7 . Our kafka retention policy is 7 days. I start reading from the latest offset of the brokers. As soon as i start the topology within a few minutes i get the error below: kafka.common.OffsetOutOfRangeException: null...

Storm Fields grouping example

storm
I am using Kafka storm, kafka sends/emits json string to storm, in the storm, I want to distribute the load to a couple of workers based on the key/field in the json. How to do that? In my case, it is groupid field in json string. For example, I have...

Apache Storm worker async loop died

java,zookeeper,storm,apache-kafka
I am trying to setup a Storm cluster, that takes data from a Kafka bus and then processes it. So far, I have only included a single 'PrinterBolt' that should just output the message. I have tried running the example topologies in this repository and these work. Based on these...

Spark,Akka,Storm Or RxJava [closed]

apache-spark,akka,storm,rx-java,spark-streaming
I have a use case where I get 10K to 15 K Messages/sec and it might be less than 5K also sometime and I push those into rabbitMQ now those messages I should parse,run some RE on that and do some sort aggregation and run some statistics. My product(the data...

storm - handling exceptions in bolt.execute

cassandra,storm
Im using storm to process a stream wherein one of the bolts is writing to cassandra. The cassandra session.execute() command can throw an exception and I'm wondering about trapping this to 'fail' the tuple so it gets retried. The docs for IRichBolt don't show it throwing anything so I'm wondering...

How to unit test Java Hbase API

java,hadoop,mocking,hbase,storm
I am using the Java HBase API to get a value from Hbase. This is my code. public class GetViewFromHbaseBolt extends BaseBasicBolt { private HTable table; private String zkQuorum; private String zkClientPort; private String tableName; public GetViewFromHbaseBolt(String table, String zkQuorum, String zkClientPort) { this.tableName = table; this.zkQuorum = zkQuorum; this.zkClientPort...

An Apache Storm bolt receive multiple input tuples from different spout/bolt

java,storm
Is it possible for a bolt receive multiple input tuples from different spout/bolt? For instance, Bolt C receive input tuples from Spout A and input tuples from Bolt B to be processed. How should I implement it? I mean writing the Java code for Bolt C and also its topology.

Solr Indexing in Storm topology vs Hbase NG Indexer

indexing,solr,hbase,storm
I am working on designing the Data Indexing feature into Solr. We are using Storm Topology and have a Hbase Bolt where it is adding data into Hbase. The requirement is what ever data we are adding into Hbase, needs to be indexed as well. The following are the options:...

Storm controlling the way bolts emit data

java,storm
I have a small topology. It has a kafka spout, a bolt reading from spout (Bolt A). Bolt A emits to two bolts (Bolt B and Bolt C). I have used fields grouping. The Bolt A emits two different types of data. One is intended for Bolt B and Other...

storm supervisor exits when processing event

java,storm,apache-storm
when i start the supervisor by issuing bin/storm supervisor it exits. The logs are with 2015-06-12T02:28:27.811-0700 b.s.event [ERROR] Error when processing event java.lang.NoSuchMethodError: org.apache.commons.io.FileUtils.moveDirectory(Ljava/io/File;Ljava/io/File;)V at backtype.storm.daemon.supervisor$fn__7480.invoke(supervisor.clj:489) ~[storm-core-0.9.5.jar:0.9.5] at clojure.lang.MultiFn.invoke(MultiFn.java:241) ~[clojure-1.5.1.jar:na] at...

Apache Storm: Nimbus not starting on Port 6627

java,apache,bigdata,storm
I can't see anything on port 6627 after starting Nimbus. I am getting the Connection Refused error. Following errors are thrown in Nimbus Log: 6899 [main] ERROR com.smarterme.intake.EmbeddedTopologyRunner - Toplogy submitting failed.....org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused java.lang.RuntimeException: org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused at backtype.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:38) at...

How to implement log changes in a storm project locally using Eclipse

java,eclipse,logback,storm
I can't figure out how to implement changes to a local storm project using Eclipse. I can implement changes no problem on server deployed code by modifying the ./logback/clusters.xml file (e.g. changing log level from INFO to ERROR). How could I go about doing this?

KafkaSpout working example [closed]

java,storm,apache-kafka
I recently got familiar with Apache kafka and have a working example of a producer-consumer. My next step is to integrate kafka with Spout and Bolt and i am having a hard time getting the examples available(they are mostly old) working locally. I got the following example working storm-book/examples-ch02-getting_started which...

What is/are the main difference(s) between Flink and Storm?

storm,flink
Flink has been compared to Spark, which, as I see it, is the wrong comparison; Similarly, it does not make that much sense to me to compare Flink to Samza. In both cases it compares a real-time vs. a batched event processing strategy, even if at a smaller "scale" in...

Storm UI to display topology after killed

storm,apache-storm
Is there a configuration parameter in storm so that a topology will still be displayed after it is killed ? Thank you...

Error from Zookeper 3.4.6 version

zookeeper,storm
I downloaded zookeeper-3.4.6.tar.gz and while executing zkServer.sh start , I am getting below error.I did google but couldn't find a solution, Please let me kno if you see similar issues. CLASSPATH=/home/spanda20/zookeeper/bin/../src/java/lib/*.jar:/home/spanda20/zookeeper/bin/../conf: zkServer.sh: 81: /home/spanda20/zookeeper/bin/zkEnv.sh: Syntax error: "(" unexpected (expecting "fi") ...

Send byte array to storm kafka bolt

java,apache,storm,apache-kafka,kafka-consumer-api
I have written a storm topology. I basically want to send tuples in avro schema in form of byte array to kafka topic. This is how I set the bolt : builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>()) .fieldsGrouping(BOLT1, new Fields("key")); And this is how I am converting to byte array Schema schema...

apache storm, load balance, json

storm
I am using Kafka storm, kafka sends/emits json string to storm, in the storm, I want to distribute the load to a couple of workers based on the key/field in the json. How to do that? In my case, it is groupid field in json string. For example, I have...

Apache storm - java.lang.NoClassDefFoundError: com/google/gson/Gson

java,maven,gson,noclassdeffounderror,storm
I'm using Apache Storm 0.9.4. I set up a 5 nodes cluster and it works fine. (Actually 5 supervisors are working as docker containers on 5 different physical nodes.) My environment is here $cat /etc/redhat-release CentOS release 6.6 (Final) $docker -v Docker version 1.4.1, build 5bc2ff8/1.4.1 $java -version java version...

Storm Spout ERROR backtype.storm.util - Async loop died

java,storm
My data comes from a sensor that is connected to my storm cluster by a websocket, so whenever a datapoint arrives on my websocket server I add it to a ConcurrentLinkedQueue. I have no a priori information regarding the frequency of datapoints "production". My spout takes the datapoint on this...

Storm results visualization

visualization,storm
I've spent hours to find the best way to visualize the results of my Storm system. It seems that there is an infinite combination of technologies and I'm getting completely lost. I want to avoid the use of a database so from what I have understood my system should have...

storm cluster mode, distributed bolt/worker load sharing

storm
HI: I will have a large capacity storm analysis task. For me, I want to spin off many bolt/workers across different nodes/machines to take the task so that every machine could share the load . I am wondering how to write bolt/workers/topology so that they could communicate with each other....

How can I serialize a numpy array while preserving matrix dimensions?

python,json,numpy,storm
numpy.array.tostring doesn't seem to preserve information about matrix dimensions (see this question), requiring the user to issue a call to numpy.array.reshape. Is there a way to serialize a numpy array to JSON format while preserving this information? Note: The arrays may contain ints, floats or bools. It's reasonable to expect...

Parallelism in Apache Storm

storm
I am new to Apache Storm and trying to design a simple topology for my use case. The explanation for parallelism in Storm (Understanding the Parallelism of a Storm Topology) has left me with two queries: 1) Is it safe to assume that same worker will have the executors for...

Field and values connection in Storm

stream,field,tuples,storm
I have a fundamental question in storm. I can clearly understand some basic things. For example i have a main class with this code inside: ... TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, new SentenceSpout()); builder.setBolt(SPLIT_BOLT_ID, new SplitSentenceBolt()).shuffleGrouping(SENTENCE_SPOUT_ID); builder.setBolt(COUNT_BOLT_ID, new WordCountBolt(), 3).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); builder.setBolt(REPORT_BOLT_ID, new ReportBolt()).globalGrouping(COUNT_BOLT_ID); ... and i understand...

Storm UI not working

storm
We are executing a Storm topology in pseudo mode. Storm topology is executing fine and able to connect Storm UI (8080). But Storm UI is not displaying the running topology information. Restarted the storm UI process also but no use. Does storm needs special configuration to display running topology in...

Kafka, nodejs client (kafkaesque) socket default timeout

javascript,node.js,ubuntu-14.04,storm
I am using Ubuntu 14.04. a nodejs ->kafkaesque(kafka nodejs client)->kafka_2.9.2-0.8.2.1->storm. In Kafkaesque,(kafka nodejs client), I notice in api.js, there are following line to create socket. _socket = net.createConnection(_options.port, _options.host); I am wondering what's the default value for timeout of socket. Shall I setTimeout(0) if I want socket connection is always...

Error while deploying topology on storm cluster

java,logging,storm,apache-kafka,kafka
I am trying to deploy a simple word count topology on storm clustre.I am using kafka as the input(kafka Spout).This is the error i am getting.I am very new to storm.Please suggest the changes.Thanks in Advance !! java.lang.NoClassDefFoundError: Could not initialize class org.apache.log4j.Log4jLoggerFactory at org.apache.log4j.Logger.getLogger(Logger.java:39) at kafka.utils.Logging$class.logger(Logging.scala:24) at kafka.consumer.SimpleConsumer.logger$lzycompute(SimpleConsumer.scala:30) at...

Apache Zookeeper multi-nodes cluster not running

apache,zookeeper,storm
I'm following the http://jayatiatblogs.blogspot.com/2011/11/storm-installation.html & http://seaip.narlabs.org.tw/upload/content_file/547c1db495987.pdf to set up my Apache Storm & Apache Zookeeper cluster at Ubuntu 14.04 LTS in Amazon Web Services EC2. Below are my zoo.cfg for my slave nodes: ## The number of milliseconds of each tick. The length of a single tick, which i s...

Execution flow of a storm program

storm
I am new in storm and trying to understand the flow of execution of different methods from spout to bolt . Like spout has different methods like nextTuple() open() declareOutputFields() activate() deactivate() and bolt has methods like prepare() execute() cleanup() declareOutputFields() so can anyone tell me the sequence of execution...

stormconf.ser doesn't exist error on worker

zookeeper,storm
i have a 2 node storm cluster and 1 zk , One of the worker dies because of the following error. Does any one have idea on why stormconf.ser file getting deleted. i'm using 0.9.2 storm and 3.4.6 zk version o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED 2015-01-31 01:23:06 o.a.c.f.s.ConnectionStateManager [WARN] There...