FAQ Database Discussion Community


Spark: Group RDD by id

sql,hadoop,mapreduce,apache-spark,rdd
I have a 2 RDDs. In Spark scala, how do I join event1001RDD and event2009RDD if they have the same id? val event1001RDD: schemaRDD = [eventtype,id,location,date1] [1001,4929102,LOC01,2015-01-20 10:44:39] [1001,4929103,LOC02,2015-01-20 10:44:39] [1001,4929104,LOC03,2015-01-20 10:44:39] val event2009RDD: schemaRDD = [eventtype,id,date1,date2] [2009,4929101,2015-01-20 20:44:39,2015-01-20 20:44:39] [2009,4929102,2015-01-20 15:44:39,2015-01-20 21:44:39] [2009,4929103,2015-01-20 14:44:39,2015-01-20 14:44:39] [2009,4929105,2015-01-20...

scala Spark get the top words in each row of Array

scala,apache-spark,scala-collections,rdd
I'm unable to get the top word in Array of int and Strings . See the below Array and required output: Consider n is an RDD and suggest me the Required functions for the getting output . scala> n.take(10) res3: Array[(Int, String)] = Array((4,Hi how are you ,how), (2,hello good...

How to judge if a RDD will load into ram ?

apache-spark,rdd
A example of http://www.eecs.berkeley.edu/Pubs/TechRpts/2014/EECS-2014-12.pdf as follows. lines = spark.textFile("hdfs://...") errors = lines.filter(_.startsWith("ERROR")) errors.persist() The thesis sys: "Note that the base RDD, lines, is not loaded into RAM. This is desirable because the error messages might only be a small fraction of the data (small enough to fit into memory)" My...

JDBC RDD Query Statement without '?'

sql,scala,jdbc,apache-spark,rdd
I am using Spark with Scala and trying to get data from a database using JdbcRDD. val rdd = new JdbcRDD(sparkContext, driverFactory, testQuery, rangeMinValue.get, rangeMaxValue.get, partitionCount, rowMapper) .persist(StorageLevel.MEMORY_AND_DISK) Within the query there are no ? values to set (since the query is quite long I am not putting it here.)...

Spark sort RDD and join their rank

apache-spark,rdd
I have an RDD[(VertexId, Double)], and I want to sort it by _._2 and join the index(rank) with this RDD. Therefore I can get an element and its rank by filter. Currently I sort the RDD by sortBy, but I do not know how to join a RDD with its...

how to interpret RDD.treeAggregate

scala,apache-spark,rdd
I ran into this line in Spark source val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))( seqOp = (c, v) => { // c: (grad, loss, count), v: (label, features) val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1)) (c._1, c._2 + l, c._3 + 1) },...

Spark zipPartitions on the same RDD

apache-spark,rdd
I'm quite a newbie with Spark and I have some problem in doing something like a cartesian but only within the same partition. Maybe an example can swoh clearly what I want to do: let's suppose we have a RDD made with sc.parallelize(1,2,3,4,5,6) and this RDD is partitioned in three...

Map a table of a cassandra database using spark and RDD

java,mapreduce,apache-spark,rdd
i have to map a table in which is written the history of utilization of an app. The table has got these tuples: <AppId,date,cpuUsage,memoryUsage> <AppId,date,cpuUsage,memoryUsage> <AppId,date,cpuUsage,memoryUsage> <AppId,date,cpuUsage,memoryUsage> <AppId,date,cpuUsage,memoryUsage> AppId is always different, because is referenced at many app, date is expressed in this format dd/mm/yyyy hh/mm cpuUsage and memoryUsage are...

Ordered union on spark RDDs

apache-spark,rdd
I am trying to do a sort on key of key-record pairs using apache spark. The key is 10 bytes long and the value is about 90 bytes long. In other words I am trying to replicate the sort benchmark Databricks used to break the sorting record. One of the...

What is RDD dependency in Spark?

apache-spark,rdd
As I know there are two types of dependencies: narrow & wide. But I dont understand how dependency affects to child RDD. Is child RDD only metadata which contains info how to build new RDD blocks from parent RDD? Or child RDD is self-sufficient set of data which was created...

When using textFile to create an RDD in Spark, what is the index that is displayed in the result?

apache-spark,rdd
When I create an RDD using sc.textFile in Spark, I get a result like: org.apache.spark.rdd.RDD[String] = file:///home/cloudera/data MapPartitionsRDD[133] at textFile at <console>:23 What does the [133] represent? I see that it increases, so feels like some kind of ID....

Sorting an RDD in Apache Spark using mapPartitions and reduce

java,apache-spark,rdd
I am trying to sort an RDD in Spark. I am aware that I can use the sortBy transformation to obtain a sorted RDD. I am trying to measure how sortBy performs when compared to using mapPartitions to sort individual partitions, and then using a reduce function to merge the...

PySpark repartitioning RDD elements

hadoop,apache-spark,partitioning,rdd,pyspark
I have a spark job that reads from a Kafka stream and performs an action for each RDD in the stream. If the RDD is not empty, I want to save the RDD to HDFS, but I want to create a file for each element in the RDD. I've found...

How to check if Spark RDD is in memory?

apache-spark,rdd,in-memory
I have an instance of org.apache.spark.rdd.RDD[MyClass]. How can I grammatically check if the instance is persist\inmemory?

How does spark handle missing values?

apache-spark,rdd
Apache spark support sparse data. For example, we can use MLUtils.loadLibSVMFile(...) to load data into an RDD. I was wondering how does spark deal with those missing values. ...

PySpark Create Multi Indexed Paired RDD with function

apache-spark,rdd,pyspark
A little while ago I asked a question about organizing and structuring an RDD with a multiple keys. See PySpark Suggestion on how to organize RDD Each object in my current rdd contains a start_time, end_time, id, and position. I want to group the objects by id and time. I...

How to use saveTOCassandra()

cassandra,apache-spark,spark-streaming,rdd
I am new to spark I want to save my spark data to cassandra with a condition that I have an RDD and I want to save data of this RDD into more he one table in cassandra?Is this possible if yes then how ?

Spark: Group RDD Sql Query

sql,hadoop,apache-spark,rdd,apache-spark-sql
I have 3 RDDs that I need to join. val event1001RDD: schemaRDD = [eventtype,id,location,date1] [1001,4929102,LOC01,2015-01-20 10:44:39] [1001,4929103,LOC02,2015-01-20 10:44:39] [1001,4929104,LOC03,2015-01-20 10:44:39] val event2009RDD: schemaRDD = [eventtype,id,celltype,date1] (not grouped by id since I need 4 dates from this depending on celltype) [2009,4929101,R01,2015-01-20 20:44:39] [2009,4929102,R02,2015-01-20 14:00:00] (RPM) [2009,4929102,P01,2015-01-20 12:00:00] (PPM) [2009,4929102,R03,2015-01-20 15:00:00] (RPM)...

Adding Constant to RDD

scala,apache-spark,rdd
I have a really stupid question, I know that a RDD is immutable, but is there any way that you can add a column of constant to a RDD? More specifically, I have an RDD of RDD[a:String, b:String], I wish to add a column of 1's after it so that...

How do I subtract an RDD[(Key,Object)] from another one?

scala,apache-spark,rdd
I want to change the format of my data, from RDD(Label:String,(ID:String,Data:Array[Double])) to an RDD Object with the label, id and data as components. But when I print my RDD consecutively twice, the references of objects change : class Data_Object(private val id:String, private var vector:Vector) extends Serializable { var label =...

In Spark, what is left in the memory after a Job is done?

memory,apache-spark,rdd
I used ./bin/spark-shell to run some experiments and find out the following facts. When running jobs (transformation + action), I notice the memory usage in the top. For example, for 5G text file, I did a simple filter() and count(). After the job is done, there are 7g marked as...

OFF_HEAP rdd was removed automatically by Tachyon, after the spark job done

apache-spark,rdd,tachyon
I run a spark application, it uses a StorageLevel.OFF_HEAP to persist a rdd(my tachyon and spark are both in local mode). like this: val lines = sc.textFile("FILE_PATH/test-lines-1") val words = lines.flatMap(_.split(" ")).map(word => (word, 1)).persist(StorageLevel.OFF_HEAP) val counts = words.reduceByKey(_ + _) counts.collect.foreach(println) ... sc.stop when persist done, I can see...

Convert RDD[Map[String,Double]] to RDD[(String,Double)]

scala,apache-spark,rdd
I did some calculation and returned my values in a RDD containing scala map and now I want to remove this map and want to collect all keys values in a RDD. Any help will be appreciated....

How to load data from saved file with Spark

apache-spark,rdd
Spark provide method saveAsTextFile which can store RDD[T] into disk or hdfs easily. T is an arbitrary serializable class. I want to reverse the operation. I wonder whether there is a loadFromTextFile which can easily load a file into RDD[T]? Let me make it clear: class A extends Serializable {...

scala operations on part of RDD

scala,apache-spark,rdd
I am new to Scala and I am trying to do something for a project: I generated a RDD: RDD [UserID1, Date1, Value1] [UserID1, Date2, Value2] [UserID1, Date3, Value3] [UserID2, Date1, Value1] [UserID3, Date1, Value1] I wish to run a function on this RDD that generates RDD [UserID1, FunctionResult1, FunctionResult2]...

In spark,what's the relationship between the num of stages and Parallelism?

apache-spark,rdd
In spark,what's the relationship between the num of stages and Parallelism? Had better give a example

Functional approach in sequential RDD processing [Apache Spark]

apache-spark,rdd
I have a RDD, connected to an HBase table. Each row(key) represents a GPS location. Now I've written a function to calculate the distance between two points. The function should be called with the current row and its predecessor [i-1] Now I'm struggling to get this done in a functional...

What's the meaning of “already computed partitions that can short-circuit the computation of a parent RDD”?

apache-spark,rdd,apache-spark-sql
The spark thesis(http://www.eecs.berkeley.edu/Pubs/TechRpts/2014/EECS-2014-12.pdf) say as pic below I don't understand "What's the meaning of "already computed partitions that can short-circuit the computation of a parent RDD" Can you explain it to me and list one or two examples ?...

Why is Spark fast when word count?

parallel-processing,streaming,apache-spark,bigdata,rdd
Test case: word counting in 6G data in 20+ seconds by Spark. I understand MapReduce, FP and stream programming models, but couldn’t figure out the word counting is so amazing fast. I think it’s an I/O intensive computing in this case, and it’s impossible to scan 6G files in 20+...

print elements of particular rdd partition in spark

apache-spark,rdd
The foreach() prints all the elements in rdd, what to do to print the elements of a particular partition (For eg. 5th partition) alone? val data = 1 to 50 val distData = sc.parallelize(data,10) distData.foreach(println)...

What is the result of RDD transformation in Spark?

apache-spark,rdd
Can anyone explain, what is the result of RDD transformations? Is it the new set of data (copy of data) or it is only new set of pointers, to filtered blocks of old data?

PySpark Suggestion on how to organize RDD

apache-spark,rdd
I'm a Spark noobie and I'm trying to test something out on Spark and see if there are any performance boosts for the size of data that I'm using. Each object in my rdd contains a time, id, and position. I want to compare the positions of groups with same...

Join two (non)paired RDDs to make a DataFrame

apache-spark,rdd,apache-spark-sql,pyspark
As the title describes, say I have two RDDs rdd1 = sc.parallelize([1,2,3]) rdd2 = sc.parallelize([1,0,0]) or rdd3 = sc.parallelize([("Id", 1),("Id", 2),("Id",3)]) rdd4 = sc.parallelize([("Result", 1),("Result", 0),("Result", 0)]) How can I create the following DataFrame? Id Result 1 1 2 0 3 0 If I could create the paired RDD [(1,1),(2,0),(3,0)]...

ReduceByKey with a byte array as the key

apache-spark,rdd
I would like to work with RDD pairs of Tuple2<byte[], obj>, but byte[]s with the same contents are considered as different values because their reference values are different. I didn't see any to pass in a custom comparer. I could convert the byte[] into a String with an explicit charset,...

My RDD change his values himself

scala,apache-spark,rdd
I have a basic RDD[Object] on which i apply a map with a hashfunction on Object values using nextGaussian and nextDouble scala function. And when i print values there change at each print def hashmin(x:Data_Object, w:Double) = { val x1 = x.get_vector.toArray var a1 = Array(0.0).tail val b = Random.nextDouble...

Usage of local variables in closures when accessing Spark RDDs

closures,apache-spark,rdd,pyspark
I have a question regarding the usage of local variables in closures when accessing Spark RDDs. The problem I would like to solve looks as follows: I have a list of textfiles that should be read into an RDD. However, first I need to add additional information to an RDD...

Spark: Intersection of Lists not working

list,scala,apache-spark,rdd
I have an RDD of the form: t1-> (Long, List[Long]) and a list of the form t2-> List[Long] I need to perform union and intersection of the lists. I am trying the following code: val t1 = a.map(x => (x._1, (List(x._2)))).reduceByKey(_ ++ _) val t2 = b.map(x => (x._1, (List(x._2)))).reduceByKey(_...

Scala - sort RDD partitions

scala,sorting,apache-spark,rdd
Assume I have RDD of Integer from 1 to 1,000,000,000 and I want to print them ordered using foreachPartition. There might be situation that the partition of 5-6-7-8 will be printed before 1-2-3-4. How can I prevent this? Thanks, Maya...

Spark RDD repeated reduce operations yielding inconsistent results

scala,mapreduce,apache-spark,reduce,rdd
Consider the following code in Spark that should return the sum of the sqrt's of a sequence of integers: // Create an RDD of a sequence of integers val data = sc.parallelize(Range(0,100)) // Transform RDD to sequence of Doubles val x = data.map(_.toDouble) // Reduce the sequence as the sum...

Collect results from RDDs in a dstream driver program

apache-spark,spark-streaming,rdd,dstream
I have this function in the driver program which collects the result from rdds into an array and send it back. However, even though the RDDs (in the dstream) have data, the function is returning an empty array...What am I doing wrong? def runTopFunction() : Array[(String, Int)] = { val...

Apache Spark RDD partitioning and join

apache-spark,rdd
When I join two RDDs where is the data actually joined, i.e. is the data aggregated on the driver and then shipped back out to the worker nodes, or is one of the nodes randomly selected to "receive" the data? Furthermore, if I call partition on a pairRDD then is...

Trying to get spark streaming to read data stream from website, what is the socket?

hadoop,apache-spark,spark-streaming,rdd
I am trying to get this data http://stream.meetup.com/2/rsvps into spark stream They are JSON objects, I know the lines will be strings, I just want it to work before I try JSON. I am not sure what to put as the port, I assume that is the problem. SparkConf conf...

Scala/Spark wait for one function to complete before output the results

scala,apache-spark,static-methods,rdd
I have the following utility function in scala: object MyUtiltity { def processData(data1: org.apache.spark.rdd.RDD[String], data2: org.apache.spark.rdd.RDD[String], data3: org.apache.spark.rdd.RDD[String]) = { function1(data1, data3) function2(data2, data3) } private def function1 {...} private def function2 {...} } And in my main job, I call: MyUtility.processData(data1, data2, data3) data3.saveAsTextFile("myOutput") It seems that data3 was...

How to control implicit caching of RDDs by Spark?

caching,apache-spark,persist,rdd
As a newbie to Spark, I have been looking at their python example for estimation of PI. I am interested to understand Spark's performance by re-estimating PI several times within the same context. What I am observe is that the value of PI is unchanged across these re-estimations, and the...

Save rdd into mongo database using java

mongodb,hadoop,spark-streaming,rdd
I am attempting to save tweets in MongoDB using Java, This is what I have; JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration( 3000)); JavaDStream<Status> tweets = TwitterUtils.createStream(ssc); JavaDStream<String> statuses = tweets .map(new Function<Status, String>() { public String call(Status status) { return status.getUser().getName() + ":" + status.getText(); } }); JavaDStream<String> users...

Spark & Scala: can't get MappedRDD to perferm groupByKey from RDD

scala,apache-spark,rdd
I am facing a disappointing issue while trying to use groupByKey or any function of a PairRDD or MappedRDD. What I get is that I have always just a RDD and I don't know how to convert it (really I am quite sure that the conversion should be automatically detected...

Process multiple 'lines' in apache-spark RDD

apache-spark,rdd
I am very new to Spark and I have a question. I try to do a simple sentiment analysis with some data. In the data-file every line contains a product-review. Here is my code for processing one line: // wordlist val pos_file = "/user/cloudera/Data/pos_list.txt" val neg_file = "/user/cloudera/Data/neg_list.txt" val pos_words...

Apache spark applying map transformation on RDDs

apache-spark,bigdata,rdd
I have a HadoopRDD from which I'm creating a first RDD with a simple Map function then a second RDD from the first RDD with another simple Map function. Something like : HadoopRDD -> RDD1 -> RDD2. My question is whether Spak will iterate over the HadoopRDD record by record...

Spark JSON text field to RDD

scala,cassandra,apache-spark,rdd
I've got a cassandra table with a field of type text named snapshot containing JSON objects: [identifier, timestamp, snapshot] I understood that to be able to do transformations on that field with Spark, I need to convert that field of that RDD to another RDD to make transformations on the...

Is there an “Explain RDD” in spark

apache-spark,rdd
In particular, if I say rdd3 = rdd1.join(rdd2) then when I call rdd3.collect, depending on the Partitioner used, either data is moved between nodes partitions, or the join is done locally on each partition (or, for all I know, something else entirely). This depends on what the RDD paper calls...

Configuring Executor memory and number of executors per Worker node

apache-spark,rdd
How to configure the Executor's memory in the Spark cluster. Also, How to configure number of executors per worker node ? Is there any way to know how much executor's memory is free to cache or persist new RDD's....

Does a join of co-partitioned RDDs cause a shuffle in Apache Spark?

apache-spark,spark-streaming,rdd
Will rdd1.join(rdd2) cause a shuffle to happen if rdd1 and rdd2 have the same partitioner?

Writing RDD partitions to individual parquet files in its own directory

apache-spark,rdd,apache-spark-sql,parquet
I am struggling with step where I want to write each RDD partition to separate parquet file with its own directory. Example will be: <root> <entity=entity1> <year=2015> <week=45> data_file.parquet Advantage of this format is I can use this directly in SparkSQL as columns and I will not have to repeat...

map RDD to PairRDD in Scala

java,scala,apache-spark,rdd
I am trying to map RDD to pairRDD in scala, so I could use reduceByKey later. Here is what I did: userRecords is of org.apache.spark.rdd.RDD[UserElement] I try to create a pairRDD from userRecords like below: val userPairs: PairRDDFunctions[String, UserElement] = userRecords.map { t => val nameKey: String = t.getName() (nameKey,...

In spark yarn cluster, How to work the container depends on the number of RDD partitions?

apache,hadoop,apache-spark,yarn,rdd
i have a one problem about Apache Spark(yarn cluster) In this code, although, create 10 partition but In yarn cluster, just work 3 of contatiner val sc = new SparkContext(new SparkConf().setAppName("Spark Count")) val sparktest = sc.textFile("/spark_test/58GB.dat",10) val test = sparktest.flatMap(line=> line.split(" ")).map(word=>(word, 1)) In spark yarn cluster, How to work...