apache-spark,partitioning,rdd , Big RDD versus Multiple small RDDs

Big RDD versus Multiple small RDDs


Tag: apache-spark,partitioning,rdd

Historic Data:

  1. I have multiple tables per activity that contains some historic information like GRPs and CPPs
  2. I have multiple dimensions across which GRPs and CPPs is defined for each activity
    Dimensions- Geography, TimePeriod, Primary_Message
  3. Each activity might contain a subset of these dimensions


 Activity1 {Geography, TimePeriod, GRP, CPP}

 Activity2 {TimePeriod, GRP, CPP}

 Activity3 {Primary_Message, TimePeriod, GRP, CPP}

Use Case:

  1. Sometimes I would like to view the data across Time periods (this dimension is available for all activities) for all activities

  2. Sometimes I would like to view the data across geographies (this dimension is present in a few activities)

I have to design the RDDs such that all my use cases efficiently work.

At any given point in time, each job will cater to one single activity.

I have two options -

  1. Create one RDD for each activity and parition it across dimensions in that table. Thus I will have as many RDDs as I have activities.
    For each job, I will access specific RDD and compute

  2. Create a single RDD for all the activities and paritition it on some dimension
    For each job, I will access single big RDD and perform filter on that RDD for an activity and do computations

My question is which option is more efficient to design the RDDs given the use cases and assumptions.



The advantage of a single RDD is that adding another activity type (Activity 4) would be little work. The advantage of separate RDDs is that when you only want to access one activity type, you do not have to go through the data for the other types.

As you say in a comment:

On the UI, data will be displayed for all the activities. In order to achieve this, we would run multiple jobs (one job per activity) and compute their values across time periods.

If you have N activity types and M total records, you would go through M records to render the UI if you have separate RDDs. You would go through N×M records if you have a single RDD.


How to get rid of “Spark assembly has been built with Hive, including Datanucleus jars on classpath” message?

I am running an Apache Spark app as a cron job, but I keep getting emails with the following message Spark assembly has been built with Hive, including Datanucleus jars on classpath message My cron entry is something like the following ...home/sparkJob.sh > /home/SaprkJobOperation-`date +\%Y\%m\%d\%H`-cron.log I understand that there are...

How to extract application ID from the PySpark context

A previous question recommends sc.applicationId, but it is not present in PySpark, only in scala. So, how do I figure out the application id (for yarn) of my PySpark process?...

Issue with UDF on a column of Vectors in PySpark DataFrame

I am having trouble using a UDF on a column of Vectors in PySpark which can be illustrated here: from pyspark import SparkContext from pyspark.sql import Row from pyspark.sql.types import DoubleType from pyspark.sql.functions import udf from pyspark.mllib.linalg import Vectors FeatureRow = Row('id', 'features') data = sc.parallelize([(0, Vectors.dense([9.7, 1.0, -3.2])), (1,...

Removing duplicates from Spark RDDPair values

I am new to Python and also Spark. I've an pair RDD containing (key, List) but some of the values are duplicate. RDD is of the form (zipCode,streets) I want a pair RDD which does not contain duplicates. I am trying to achieve it using python. Can anyone please help...

What to set `SPARK_HOME` to?

Installed apache-maven-3.3.3, scala 2.11.6, then ran: $ git clone git://github.com/apache/spark.git -b branch-1.4 $ cd spark $ build/mvn -DskipTests clean package Finally: $ git clone https://github.com/apache/incubator-zeppelin $ cd incubator-zeppelin/ $ mvn install -DskipTests Then ran the server: $ bin/zeppelin-daemon.sh start Running a simple notebook beginning with %pyspark, I got an error...

Access key from mapValues or flatMapValues?

In Spark 1.3, is there a way to access the key from mapValues? Specifically, if I have val y = x.groupBy(someKey) val z = y.mapValues(someFun) can someFun know which key of y it is currently operating on? Or do I have to do val y = x.map(r => (someKey(r), r)).groupBy(_._1)...

File Processing with Spark and Cassandra

Right now I'm working on loading a table from a Cassandra cluster into a Spark cluster with the Datastax Cassandra Spark Connector. Right now the spark program performs a simple mapreduce job that counts the number of rows in the Cassandra table. Everything is set up and run locally. The...

Apache Spark: Error while starting PySpark

On a Centos machine, Python v2.6.6 and Apache Spark v1.2.1 Getting the following error when trying to run ./pyspark Seems some issue with python but not able to figure out 15/06/18 08:11:16 INFO spark.SparkContext: Successfully stopped SparkContext Traceback (most recent call last): File "/usr/lib/spark_1.2.1/spark-1.2.1-bin-hadoop2.4/python/pyspark/shell.py", line 45, in <module> sc =...

“remoteContext object has no attribute”

I'm running Spark 1.4 in Databrick's Cloud. I loaded a file into my S3 instance and mounted it. Mounting worked. But I'm having trouble creating an RDD: dbutils.fs.mount("s3n://%s:%[email protected]%s" % (ACCESS_KEY, SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME) Any ideas? sc.parallelize([1,2,3]) rdd = sc.textFiles("/mnt/GDELT_2014_EVENTS/GDELT_2014.csv") ...

Spark: use reduceByKey instead of groupByKey and mapByValues

I have an RDD with duplicates values with the following format: [ {key1: A}, {key1: A}, {key1: B}, {key1: C}, {key2: B}, {key2: B}, {key2: D}, ..] I would like the new RDD to have the following output and to get ride of duplicates. [ {key1: [A,B,C]}, {key2: [B,D]}, ..]...

Join files using Apache Spark / Spark SQL

I am trying to use Apache Spark for comparing two different files based on some common field, and get the values from both files and write it as output file. I am using Spark SQL for joining both files (after storing the RDD as table). Is this the correct approach?...

Passing a function foreach key of an Array

I have an array like that : val pairs: Array[(Int, ((VertexId, Seq[Int]), Int))] which generates this output : (11,((11,ArraySeq(2, 5, 4, 5)),1)) (11,((12,ArraySeq(7, 7, 8, 2)),1)) (11,((13,ArraySeq(5, 9, 8, 7)),1)) (1,((1,ArraySeq(1, 2, 3, 4)),1)) (1,((4,ArraySeq(1, 5, 1, 1)),1)) I want to build a Graph for each pairs._1. That means for...

Is the DStream return by updateStateByKey function only contains one RDD?

Is the DStream return by updateStateByKey function only contains one RDD? If not,Under what circumstances will the DStream contains more than one RDD?

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.CanSetDropBehind issue in ecllipse

I have the below spark word count program : package com.sample.spark; import java.util.Arrays; import java.util.List; import java.util.Map; import org.apache.spark.SparkConf; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import...

In Spark, does the filter function turn the data into tuples?

Just wondering does the filter turn the data into tuples? For example val filesLines = sc.textFile("file.txt") val split_lines = filesLines.map(_.split(";")) val filteredData = split_lines.filter(x => x(4)=="Blue") //from here if we wanted to map the data would it be using tuple format ie. x._3 OR x(3) val blueRecords = filteredData.map(x =>...

Pyspark StructType is not defined

I'm trying to struct a schema for db testing, and StructType apparently isn't working for some reason. I'm following a tut, and it doesn't import any extra module. <type 'exceptions.NameError'>, NameError("name 'StructType' is not defined",), <traceback object at 0x2b555f0>) I'm on spark 1.4.0, and Ubuntu 12 if that has anything...

reduceByKey with two columns in Spark

I am trying to do group by two columns in Spark and am using reduceByKey as follows: pairsWithOnes = (rdd.map(lambda input: (input.column1,input.column2, 1))) print pairsWithOnes.take(20) The above maps command works fine and produces three columns with the third one being all ones. I tried summing the third by the first...

In sbt, how can we specify the version of hadoop on which spark depends?

Well I have a sbt project which uses spark and spark sql, but my cluster uses hadoop 1.0.4 and spark 1.2 with spark-sql 1.2, currently my build.sbt looks like this: libraryDependencies ++= Seq( "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5", "com.datastax.cassandra" % "cassandra-driver-mapping" % "2.1.5", "com.datastax.spark" % "spark-cassandra-connector_2.10" % "1.2.1", "org.apache.spark" %...

OutofMemoryErrory creating fat jar with sbt assembly

We are trying to make a fat jar file containing one small scala source file and a ton of dependencies (simple mapreduce example using spark and cassandra): import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import com.datastax.spark.connector._ import org.apache.spark.SparkConf object VMProcessProject { def main(args: Array[String]) { val conf = new SparkConf() .set("spark.cassandra.connection.host", "") .set("spark.executor.extraClassPath",...

how to parse a custom log file in scala to extract some key value pairs using patterns

I am building a spark streaming app that takes in logs coming out of a server. A log line looks something like this. 2015-06-18T13:53:46.606-0400 CustomLog v4 INFO: source="ABCD" type="type1" <xml some xml here attr1='value1' attr2='value2' > </xml> <some more xml></> time ="232" I am trying to follow the sample app...

How do I flatMap a row of arrays into multiple rows?

After parsing some jsons I have a one-column DataFrame of arrays scala> val jj =sqlContext.jsonFile("/home/aahu/jj2.json") res68: org.apache.spark.sql.DataFrame = [r: array<bigint>] scala> jj.first() res69: org.apache.spark.sql.Row = [List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)] I'd like to explode each row out into several rows. How? edit: Original json file:...

Connecting from Spark/pyspark to PostgreSQL

I've installed Spark on a Windows machine and want to use it via Spyder. After some troubleshooting the basics seems to work: import os os.environ["SPARK_HOME"] = "D:\Analytics\Spark\spark-1.4.0-bin-hadoop2.6" from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext spark_config = SparkConf().setMaster("local[8]") sc = SparkContext(conf=spark_config) sqlContext = SQLContext(sc) textFile = sc.textFile("D:\\Analytics\\Spark\\spark-1.4.0-bin-hadoop2.6\\README.md") textFile.count() textFile.filter(lambda...

PySpark No suitable driver found for jdbc:mysql://dbhost

I am trying to write my dataframe to a mysql table. I am getting No suitable driver found for jdbc:mysql://dbhost when I try write. As part of the preprocessing I read from other tables in the same DB and have no issues doing that. I can do the full run...

How to un-nest a spark rdd that has the following type ((String, scala.collection.immutable.Map[String,scala.collection.immutable.Map[String,Int]]))

Its a nested map with contents like this when i print it onto screen (5, Map ( "ABCD" -> Map("3200" -> 3, "3350.800" -> 4, "200.300" -> 3) (1, Map ( "DEF" -> Map("1200" -> 32, "1320.800" -> 4, "2100" -> 3) I need to get something like this Case...

when should groupByKey API used in spark programming?

GroupByKey suffers from shuffling the data.And GroupByKey functionality can be achieved either by using combineByKey or reduceByKey.So When should this API be used ? Is there any use case ?

Spark executors with different amounts of memory on Mesos

Is it possible to have executors with different amounts of memory on a Mesos cluster? Or am I bounded by the machine with the least memory? (Assuming I want to use all available cpus).

Install SparkR that comes with Spark 1.4

The newest version of Spark (1.4) now comes with SparkR. Does anyone know how to go about installing the SparkR implementation on Windows? The sparkR.R script is currently located in C:/spark-1.4.0/R/pkgs/R/ This appears to be a step in the right direction, but the instructions don't work for Windows as there...

Retrieving TriangleCount

I'm trying to retrieve the amount of triangles from a graph using graphX. As I'm new to both Scala and graphX, I'm currently quite stuck. I'm creating a graph from an edgefile: 1 2 1 3 2 3 This should be 1 triangle. Next I'm using the build in function...

Error when running job that queries against Cassandra via Spark SQL through Spark Jobserver

So I'm trying to run job that simply runs a query against cassandra using spark-sql, the job is submitted fine and the job starts fine. This code works when it is not being run through spark jobserver (when simply using spark submit). Could someone tell my what is wrong with...

How to transform a tabular data into transactions in spark(scala)?

I have an order transaction dataset, which looks like the following table 1,John,iPhone Cover,9.99 2,Jack,iPhone Cover,9.99 4,Jill,Samsung Galaxy Cover,9.95 3,John,Headphones,5.49 5,Bob,iPad Cover,5.45 I am considering grouping data within certain differences into different transactions. For example, I would group product 1,2,4 into transaction list List(1,2,4) for their absolute differences in price...

Convert RDD[Map[String,Double]] to RDD[(String,Double)]

I did some calculation and returned my values in a RDD containing scala map and now I want to remove this map and want to collect all keys values in a RDD. Any help will be appreciated....

Call Distinct on 'pyspark.resultiterable.ResultIterable'

I am writing some spark code and I have an RDD which looks like [(4, <pyspark.resultiterable.ResultIterable at 0x9d32a4c>), (1, <pyspark.resultiterable.ResultIterable at 0x9d32cac>), (5, <pyspark.resultiterable.ResultIterable at 0x9d32bac>), (2, <pyspark.resultiterable.ResultIterable at 0x9d32acc>)] What I need to do is to call a distinct on the pyspark.resultiterable.ResultIterable I tried this def distinctHost(a, b): p...

Spark-submit class not found exception

I am trying to run this simple spark application with the spark submit command using this quick start tutorial. http://spark.apache.org/docs/1.2.0/quick-start.html#self-contained-applications. when I try to run it using spark-1.4.0-bin-hadoop2.6\bin>spark-submit --class " SimpleApp" --master local[4] C:/.../Documents/Sparkapp/target/scala- 2.10/simple-project_2.10-1.0.jar I get the following exception: java.lang.ClassNotFoundException: SimpleApp at java.net.URLClassLoader$1.run(Unknown Source) at java.net.URLClassLoader$1.run(Unknown...

spark-mllib: Error “reassignment to val” in source code

I'm using IDEA SBT project to test spark-mllib code. Here is build.sbt: name := "SparkTest" version := "1.0" scalaVersion := "2.11.6" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.2.0", "org.apache.spark" %% "spark-mllib" % "1.2.0" ) After all the import and compile work has done, I found some errors in lib...

Profiling a scala spark application

I would like to profile my spark scala applications to figure out the parts of the code which i have to optimize. I enabled -Xprof in --driver-java-options but this is not of much help to me as it gives lot of granular details. I am just interested to know how...

Spark streaming transform function

I am having compilation errors in the transform function for spark streaming. Specifically seem to be missing finalizing the DStream variable or something similar. I have copied from the amplab tutorials so slightly confused... Here is the code, the problem is in the transform function towards the end. Here is...

Pyspark: using filter for feature selection

I have an array of dimensions 500 x 26. Using the filter operation in pyspark, I'd like to pick out the columns which are listed in another array at row i. Ex: if a[i]= [1 2 3] Then pick out columns 1, 2 and 3 and all rows. Can this...

Does the scala ! subprocess command waits for the subprocess to finish?

So, I have the following kind of code running in each map tasks on Spark. @volatile var res = (someProgram + fileName) ! var cmdRes = ("rm " + fileName) !; The filenames for each map tasks are unique. The basic idea is that once the first command finishes, the...

Spark saving RDD[(Int, Array[Double])] to text file got strange result

I am trying to save the userFeature of a MatrixFactorizationModel to textFile, which according to the doc is a RDD of type [(Int, Array[Double])]. So I just called model.userFeature.saveAsTextFile("feature") However, the results I got are something like: (1,[[email protected]) (5,[[email protected]) (9,[[email protected]) (13,[[email protected]) (17,[[email protected]) (21,[[email protected]) (25,[[email protected]) (29,[[email protected]) (33,[[email protected]) (37,[[email protected]) (41,[[email protected]) (45,[[email protected]) (49,[[email protected])...

Conditionally Combining/Reducing key-pairs

I've had this issue for some time now, and I think it has to do with my lack of understanding of how to use combineByKey and reduceByKey, so hopefully somebody can clear this up. I am working with DNA sequences, so I have a procedure to produce a bunch of...

Count of second dimension in two dimension data in Spark

I have the data in this format (apple, laptop) (apple, laptop) (apple, ipad) (dell, laptop) I want to output to be (apple, laptop, 2) (apple, ipad, 1) (dell, laptop, 1) I wanted to do this using groupby and then count but groupby is not allowing grouping based on two columns....

Spark streaming on YARN executor's logs not available

I'm running the following code .map{x => Logger.fatal("Hello World") x._2 } It's spark streaming applciation runs on YARN. I upadted log4j and provided it with spark-submit (using --files). My Log4j configuration was loaded which I see from logs and applied to Driver's logs (I see my log level only and...

Which spark MLIB algorithm to use?

I'm newbie to machine learning and would like to understand what algorithm (Classification algorithm or co-relation algorithm?) to use in order to understand what is the relationship between one or more attributes. for example consider I have following set of attributes, Bill No, Bill Amount, Tip amount, Waiter Name and...

Spark on yarn jar upload problems

I am trying to run a simple Map/Reduce java program using spark over yarn (Cloudera Hadoop 5.2 on CentOS). I have tried this 2 different ways. The first way is the following: YARN_CONF_DIR=/usr/lib/hadoop-yarn/etc/hadoop/; /var/tmp/spark/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class MRContainer --master yarn-cluster --jars /var/tmp/spark/spark-1.4.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-hadoop2.4.0.jar simplemr.jar This method gives the following error: diagnostics: Application application_1434177111261_0007...

SparkR and Packages

How do one call packages from spark to be utilized for data operations with R? example i am trying to access my test.csv in hdfs as below Sys.setenv(SPARK_HOME="/opt/spark14") library(SparkR) sc <- sparkR.init(master="local") sqlContext <- sparkRSQL.init(sc) flights <- read.df(sqlContext,"hdfs://sandbox.hortonWorks.com:8020 /user/root/test.csv","com.databricks.spark.csv", header="true") but getting error as below: Caused by: java.lang.RuntimeException: Failed to...

Shuffled vs non-shuffled coalesce in Apache Spark

What is the difference between the following transformations when they are executed right before writing RDD to a file? coalesce(1, shuffle = true) coalesce(1, shuffle = false) Code example: val input = sc.textFile(inputFile) val filtered = input.filter(doSomeFiltering) val mapped = filtered.map(doSomeMapping) mapped.coalesce(1, shuffle = true).saveAsTextFile(outputFile) vs mapped.coalesce(1, shuffle = false).saveAsTextFile(outputFile)...

Graphx EdgeRDD count taking long time to compute

I am running a stand alone spark, I have this code below related to EdgeRDD. These are graph edges loaded from a textfile. There are around 67 million records. val edges: RDD[Edge[Int]] = edge_file.map(line => {val x = line.split("\\s+") Edge(x(0).toLong, x(1).toLong, x(2).toInt); }) val edges1: EdgeRDD[Int] = EdgeRDD.fromEdges(edges) println(edges1.count) The...

Subtract an RDD from another RDD dosen't work correctly

I want to subtract an RDD from another RDD. I looked into the documentation and I found that subtract can do that. Actually, when I tested subtract, the final RDD remains the same and the values are not removed ! Is there any other function to do that ? Or...

Include package in Spark local mode

I'm writing some unit tests for my Spark code in python. My code depends on spark-csv. In production I use spark-submit --packages com.databricks:spark-csv_2.10:1.0.3 to submit my python script. I'm using pytest to run my tests with Spark in local mode: conf = SparkConf().setAppName('myapp').setMaster('local[1]') sc = SparkContext(conf=conf) My question is, since...

Populate csv with Scala

I have a csv file which is more or less "semi-structured) rowNumber;ColumnA;ColumnB;ColumnC; 1;START; b; c; 2;;;; 4;;;; 6;END;;; 7;START;q;x; 10;;;; 11;END;;; Now I would like to get data of this row --> 1;START; b; c; populated until it finds a 'END' in columnA. Then it should take this row -->...