FAQ Database Discussion Community


Spark SQL JSON Boolean Evaluation

apache-spark,pyspark
I have the example JSON Schema (cut off due to size): |-- LinearScheduleResult: struct (nullable = true) | |-- Build: string (nullable = true) | |-- EndTimestamp: string (nullable = true) | |-- Errors: array (nullable = true) | | |-- element: string (containsNull = true) | |-- RequestId: string...

PySpark Create Multi Indexed Paired RDD with function

apache-spark,rdd,pyspark
A little while ago I asked a question about organizing and structuring an RDD with a multiple keys. See PySpark Suggestion on how to organize RDD Each object in my current rdd contains a start_time, end_time, id, and position. I want to group the objects by id and time. I...

PySpark distributed processing on a YARN cluster

apache-spark,yarn,cloudera-cdh,pyspark
I have Spark running on a Cloudera CDH5.3 cluster, using YARN as the resource manager. I am developing Spark apps in Python (PySpark). I can submit jobs and they run succesfully, however they never seem to run on more than one machine (the local machine I submit from). I have...

PySpark: filtering out RDD elements fails on 'NoneType' object is not iterable

apache-spark,pyspark
I want to filter out elements of an RDD where the field 'string' is not equal to 'OK'. I create my RDD from a set of CSV files on HDFS, then use map to get the structure I want before trying to filter: import csv, StringIO files = "/hdfs_path/*.csv" fields...

HDFS / Hadoop api access from pyspark worker

apache-spark,hdfs,pyspark
I need to read/scan/write files to/from the hdfs from within a pyspark worker. Note the following api's are not applicable since they run off of the driver: sc.textFile() sc.saveAsParquetFile() etc It would be very much preferable not to involve additional third party libraries (e.g. pyhadoop). One option is to shell...

Removing duplicates from Spark RDDPair values

python,apache-spark,pyspark
I am new to Python and also Spark. I've an pair RDD containing (key, List) but some of the values are duplicate. RDD is of the form (zipCode,streets) I want a pair RDD which does not contain duplicates. I am trying to achieve it using python. Can anyone please help...

Spark reduceByKey on several different values

python,apache-spark,pyspark
I have a table stored as an RDD of lists, on which I want to perform something akin to a groupby in SQL or pandas, taking the sum or average for every variable. The way I currently do it is this (untested code): l=[(3, "add"),(4, "add")] dict={} i=0 for aggregation...

Connecting from Spark/pyspark to PostgreSQL

postgresql,jdbc,jar,apache-spark,pyspark
I've installed Spark on a Windows machine and want to use it via Spyder. After some troubleshooting the basics seems to work: import os os.environ["SPARK_HOME"] = "D:\Analytics\Spark\spark-1.4.0-bin-hadoop2.6" from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext spark_config = SparkConf().setMaster("local[8]") sc = SparkContext(conf=spark_config) sqlContext = SQLContext(sc) textFile = sc.textFile("D:\\Analytics\\Spark\\spark-1.4.0-bin-hadoop2.6\\README.md") textFile.count() textFile.filter(lambda...

PySpark error: “Input path does not exist”

apache-spark,pyspark
I am new to Spark and I code in Python. Following exactly my "Learning Spark" guidelines, I see "You don't need to have Hadoop installed to run Spark" Yet when I simply try to count the lines in one file using Pyspark I get the following error. What am I...

Is the DStream return by updateStateByKey function only contains one RDD?

apache-spark,spark-streaming,apache-spark-sql,pyspark
Is the DStream return by updateStateByKey function only contains one RDD? If not,Under what circumstances will the DStream contains more than one RDD?

Conditionally Combining/Reducing key-pairs

python,apache-spark,pyspark
I've had this issue for some time now, and I think it has to do with my lack of understanding of how to use combineByKey and reduceByKey, so hopefully somebody can clear this up. I am working with DNA sequences, so I have a procedure to produce a bunch of...

Are 'local[n]' pyspark applications effected by the GIL?

python,apache-spark,pyspark,gil
Generally python doesn't work well with multi threading because of the Global Interpreter Lock. Does this affect also pyspark applications running in multi threaded local mode (local[n])?...

Run spark SQL on CHD5.4.1 NoClassDefFoundError

hive,apache-spark,apache-spark-sql,pyspark
I setup my CHD5.4.1 to run some test Spark SQL on Spark. Spark work well but Spark SQL have some issues. I start pyspark as below: /opt/cloudera/parcels/CDH-5.4.1-1.cdh5.4.1.p0.6/lib/spark/bin/pyspark --master yarn-client I want to select a table in Hive with Spark SQL: results = sqlCtx.sql("SELECT * FROM my_table").collect() It print error logs:...

reduceByKey with two columns in Spark

python,apache-spark,reduce,pyspark
I am trying to do group by two columns in Spark and am using reduceByKey as follows: pairsWithOnes = (rdd.map(lambda input: (input.column1,input.column2, 1))) print pairsWithOnes.take(20) The above maps command works fine and produces three columns with the third one being all ones. I tried summing the third by the first...

pySpark Data Frames “assert isinstance(dataType, DataType), ”dataType should be DataType"

apache-spark,dataframes,pyspark
So I want to generate my Data Frame schema dynamically I have the following Error: assert isinstance(dataType, DataType), "dataType should be DataType" AssertionError: dataType should be DataType code: filteredSchema = [] for line in correctSchema: fieldName = line.split(',') if fieldName[1] == "decimal": filteredSchema.append([fieldName[0], "DecimalType()"]) elif fieldName[1] == "string": filteredSchema.append([fieldName[0], "StringType()"])...

How to pass list of values, json pyspark

apache-spark,apache-spark-sql,pyspark,sparklines
>>> from pyspark.sql import SQLContext >>> sqlContext = SQLContext(sc) >>> rdd =sqlContext.jsonFile("tmp.json") >>> rdd_new= rdd.map(lambda x:x.name,x.age) Its working properly.But there is list of values list1=["name","age","gene","xyz",.....] When I am passing For each_value in list1: `rdd_new=rdd.map(lambda x:x.each_value)` I am getting error ...

How to use spark for map-reduce flow to select N columns, top M rows of all csv files under a folder?

hadoop,mapreduce,apache-spark,spark-streaming,pyspark
To be concrete, say we have a folder with 10k of tab-delimited csv files with following attributes format (each csv file is about 10GB): id name address city... 1 Matt add1 LA... 2 Will add2 LA... 3 Lucy add3 SF... ... And we have a lookup table based on "name"...

SparkSQL DataFrame: dropna() does not work

apache-spark,pyspark
Here's a Spark DataFrame: from pyspark.sql import SQLContext from pyspark.sql.types import * sqlContext = SQLContext(sc) data = sc.parallelize([('Foo',41,'US',3), ('Foo',39,'UK',1), ('Bar',57,'CA',2), ('Bar',72,'CA',3), ('Baz',22,'US',6), (None,75,None,7)]) schema = StructType([StructField('Name', StringType(), True), StructField('Age', IntegerType(), True), StructField('Country', StringType(), True), StructField('Score', IntegerType(), True)]) df = sqlContext.createDataFrame(data,schema) data.show() Name Age Country Score Foo 41 US 3 Foo...

pySpark DataFrames Aggregation Functions with SciPy

apache-spark,dataframes,pyspark
I've tried a few different scenario's to try and use Spark's 1.3 DataFrames to handle things like sciPy kurtosis or numpy std. Here is the example code but it just hangs on a 10x10 dataset (10 rows with 10 columns). I've tried: print df.groupBy().agg(kurtosis(df.offer_id)).collect() print df.agg(kurtosis(df.offer_ID)).collect() But this works no...

Handling bad items in map function in Spark

exception-handling,apache-spark,pyspark
What is an elegant way to deal with exceptions in map functions in Spark? For example with: exampleRDD= ["1","4","7","2","err",3] exampleRDD=exampleRDD.map(lambda x: int(x)) This will not work because it will fail on the "err" item. How can I filter out faulty rows and execute map on the rest, without anticipating the...

How does Spark interoperate with CPython

scala,pandas,apache-spark,interop,pyspark
I have an Akka system written in scala that needs to call out to some Python code, relying on Pandas and Numpy, so I can't just use Jython. I noticed that Spark uses CPython on its worker nodes, so I'm curious how it executes Python code and whether that code...

Error: Must specify a primary resource (JAR or Python file) - Spark submit Python app

python,deployment,apache-spark,pyspark
I want to complete one simple task. I have set of workers. I want to deploy zip-archive, which contains set of python files. Then, I want to send some command, and, after some time, I want to get the result. Anyway, I submit my files to workers: spark-submit --master spark://User-PC:7077...

PySpark reduceByKey? to add Key/Tuple

python,apache-spark,pyspark
I have the following data and what I want to do is [(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'),...

Spark - WARN LoadSnappy: Snappy native library not loaded

pyspark,snappy
Trying to run an exercise from Spark Summit 2014. I keep getting the following when running the command in terminal: Spark assembly has been built with Hive, including Datanucleus jars on classpath Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0 15/04/26 14:21:05 WARN NativeCodeLoader:...

Show partitions on a pyspark RDD

apache-spark,pyspark
The pyspark RDD documentation http://spark.apache.org/docs/1.2.1/api/python/pyspark.html#pyspark.RDD does not show any method(s) to display partition information for an RDD. Is there any way to get that information without executing an additional step e.g.: myrdd.mapPartitions(lambda x: iter[1]).sum() The above does work .. but seems like extra effort....

More than expected jobs running in apache spark

apache-spark,bigdata,pyspark
I am trying to learn apache-spark. This is my code which i am trying to run. I am using pyspark api. data = xrange(1, 10000) xrangeRDD = sc.parallelize(data, 8) def ten(value): """Return whether value is below ten. Args: value (int): A number. Returns: bool: Whether `value` is less than ten....

Is there a way to mimic R's higher order (binary) function shorthand syntax within spark or pyspark?

r,apache-spark,pyspark
In R, I can write the following: ## Explicit Reduce(function(x,y) x*y, c(1, 2, 3)) # returns 6 However, I can also do this less explicitly with the following: ## Less explicit Reduce(`*`, c(1, 2, 3)) # also returns 6 In pyspark, I could do the following: rdd = sc.parallelize([1, 2,...

PySpark Streaming example does not seem to terminate

python,apache-spark,spark-streaming,pyspark
I am trying to understand the Python API of Spark Streaming by a simple example. from pyspark.streaming import StreamingContext dvc = [[-0.1, -0.1], [0.1, 0.1], [1.1, 1.1], [0.9, 0.9]] dvc = [sc.parallelize(i, 1) for i in dvc] ssc = StreamingContext(sc, 2.0) input_stream = ssc.queueStream(dvc) def get_output(rdd): print(rdd.collect()) input_stream.foreachRDD(get_output) ssc.start() This...

How to access SparkContext in pyspark script

apache-spark,pyspark
The following SOF question How to run script in Pyspark and drop into IPython shell when done? tells how to launch a pyspark script: %run -d myscript.py But how do we access the existin spark context? Just creating a new one does not work: ----> sc = SparkContext("local", 1) ValueError:...

Pyspark: using filter for feature selection

python,apache-spark,pyspark
I have an array of dimensions 500 x 26. Using the filter operation in pyspark, I'd like to pick out the columns which are listed in another array at row i. Ex: if a[i]= [1 2 3] Then pick out columns 1, 2 and 3 and all rows. Can this...

Should I use registerDataFrameAsTable in Spark SQL?

apache-spark,apache-spark-sql,pyspark
During migration from PySpark to Spark with Scala I encountered a problem caused by the fact that SqlContext's registerDataFrameAsTable method is private. It made me think that my approach might be incorrect. In PySpark I do the following: load each table: df = sqlContext.load(source, url, dbtable), then register each sqlContext.registerDataFrameAsTable(df,...

Join two (non)paired RDDs to make a DataFrame

apache-spark,rdd,apache-spark-sql,pyspark
As the title describes, say I have two RDDs rdd1 = sc.parallelize([1,2,3]) rdd2 = sc.parallelize([1,0,0]) or rdd3 = sc.parallelize([("Id", 1),("Id", 2),("Id",3)]) rdd4 = sc.parallelize([("Result", 1),("Result", 0),("Result", 0)]) How can I create the following DataFrame? Id Result 1 1 2 0 3 0 If I could create the paired RDD [(1,1),(2,0),(3,0)]...

Syntax while setting schema for Pyspark.sql using StructType

apache-spark,pyspark
I am new to spark and was playing around with Pyspark.sql. According to the pyspark.sql documentation here, one can go about setting the Spark dataframe and schema like this: rdd = sc.textFile('./some csv_to_play_around.csv' schema = StructType([StructField('Name', StringType(), True), StructField('DateTime', TimestampType(), True) StructField('Age', IntegerType(), True)]) # create dataframe df3 = sqlContext.createDataFrame(rdd,...

Load CSV file with Spark

python,csv,apache-spark,pyspark
I'm new to Spark and I'm trying to read CSV data from a file with Spark. Here's what I am doing : sc.textFile('file.csv') .map(lambda line: (line.split(',')[0], line.split(',')[1])) .collect() I would expect this call to give me a list of the two first columns of my file but I'm getting this...

PySpark repartitioning RDD elements

hadoop,apache-spark,partitioning,rdd,pyspark
I have a spark job that reads from a Kafka stream and performs an action for each RDD in the stream. If the RDD is not empty, I want to save the RDD to HDFS, but I want to create a file for each element in the RDD. I've found...

Apache Spark: Error while starting PySpark

python,hadoop,apache-spark,pyspark
On a Centos machine, Python v2.6.6 and Apache Spark v1.2.1 Getting the following error when trying to run ./pyspark Seems some issue with python but not able to figure out 15/06/18 08:11:16 INFO spark.SparkContext: Successfully stopped SparkContext Traceback (most recent call last): File "/usr/lib/spark_1.2.1/spark-1.2.1-bin-hadoop2.4/python/pyspark/shell.py", line 45, in <module> sc =...

lambda for RDD query with if statement

python-2.7,pyspark
I'm creating an RDD which queries access logs. I want to extract all 404 errors (the logs are objects which can be queried. I'm using this code, but it seems I can't use pass in a lambda statement: badRecords = (access_logs.map(lambda log: log if log.response_code == 404 else pass)) print...

Can I change SparkContext.appName on the fly?

apache-spark,pyspark
I know I can use SparkConf.set('spark.app.name',...) to set appName before creating the SparkContext. However, I want to change the name of the application as it progresses, i.e., after SparkContext has been created. Alas, setting sc.appName does not change how the job is shown by yarn application -list. Is there a...

“remoteContext object has no attribute”

amazon-s3,apache-spark,pyspark
I'm running Spark 1.4 in Databrick's Cloud. I loaded a file into my S3 instance and mounted it. Mounting worked. But I'm having trouble creating an RDD: dbutils.fs.mount("s3n://%s:%[email protected]%s" % (ACCESS_KEY, SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME) Any ideas? sc.parallelize([1,2,3]) rdd = sc.textFiles("/mnt/GDELT_2014_EVENTS/GDELT_2014.csv") ...

Pyspark StructType is not defined

python,apache-spark,pyspark
I'm trying to struct a schema for db testing, and StructType apparently isn't working for some reason. I'm following a tut, and it doesn't import any extra module. <type 'exceptions.NameError'>, NameError("name 'StructType' is not defined",), <traceback object at 0x2b555f0>) I'm on spark 1.4.0, and Ubuntu 12 if that has anything...

Issue with UDF on a column of Vectors in PySpark DataFrame

apache-spark,apache-spark-sql,pyspark,spark-sql
I am having trouble using a UDF on a column of Vectors in PySpark which can be illustrated here: from pyspark import SparkContext from pyspark.sql import Row from pyspark.sql.types import DoubleType from pyspark.sql.functions import udf from pyspark.mllib.linalg import Vectors FeatureRow = Row('id', 'features') data = sc.parallelize([(0, Vectors.dense([9.7, 1.0, -3.2])), (1,...

PySpark & MLLib: Class Probabilities of Random Forest Predictions

apache-spark,random-forest,mllib,pyspark
I'm trying to extract the class probabilities of a random forest object I have trained using PySpark. However, I do not see an example of it anywhere in the documentation, nor is it a a method of RandomForestModel. How can I extract class probabilities from a RandomForestModel classifier in PySpark?...

How to convert a DataFrame back to normal RDD in pyspark

apache-spark,pyspark
I need to use the (rdd.)partitionBy(npartitions, custom_partitioner) method that is not available on the DataFrame. All of the DataFrame methods refer only to DataFrame results. So then how to create an RDD from the DataFrame data? Note: this is a change (in 1.3.0) from 1.2.0. Update from the answer from...

Accessing csv file placed in hdfs using spark

csv,hadoop,apache-spark,pyspark
I have placed a csv file into the hdfs filesystem using hadoop -put command. I now need to access the csv file using pyspark csv. Its format is something like `plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')` I am a newbie to hdfs. How do I find the address to be placed in hdfs://x.x.x.x?...

How to extract application ID from the PySpark context

apache-spark,yarn,pyspark
A previous question recommends sc.applicationId, but it is not present in PySpark, only in scala. So, how do I figure out the application id (for yarn) of my PySpark process?...

pyspark only uses half memory ec2-spark

amazon-ec2,apache-spark,pyspark
I am playing around with Spark using the supplied spark-ec2: ./spark-ec2 \ --key-pair=pems \ --identity-file=/path/pems.pem \ --region=eu-west-1 \ -s 8 \ --instance-type c3.xlarge \ launch my-spark-cluster After install I ssh into the master node after it has fully installed and then I start pyspark. $ /root/spark/bin/pyspark --executor-memory 2G I specify...

Is possible to run spark (specifically pyspark) in process?

apache-spark,pyspark
When running a pyspark job there's a significant launch overhead. is it possible to run 'lightweight' jobs that don't use an external daemon? (mainly for testing with small data sets)

PySpark No suitable driver found for jdbc:mysql://dbhost

apache-spark,apache-spark-sql,pyspark
I am trying to write my dataframe to a mysql table. I am getting No suitable driver found for jdbc:mysql://dbhost when I try write. As part of the preprocessing I read from other tables in the same DB and have no issues doing that. I can do the full run...

pyspark how to load compressed snappy file

apache-spark,pyspark,snappy
I have compressed a file using python-snappy and put it in my hdfs store. I am now trying to read it in like so but I get the following traceback. I can't find an example of how to read the file in so I can process it. I can read...

Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame

apache-spark,apache-spark-sql,pyspark
Let's say I have a rather large dataset in the following form: data = sc.parallelize([('Foo',41,'US',3), ('Foo',39,'UK',1), ('Bar',57,'CA',2), ('Bar',72,'CA',2), ('Baz',22,'US',6), ('Baz',36,'US',6)]) What I would like to do is remove duplicate rows based on the values of the first,third and fourth columns only. Removing entirely duplicate rows is straightforward: data = data.distinct()...

How to set hadoop configuration values from pyspark

apache-spark,pyspark
The Scala version of SparkContext has the property sc.hadoopConfiguration I have successfully used that to set hadoop properties (in scala..) e.g. sc.hadoopConfiguration.set("my.mapreduce.setting","someVal") However the python version of SparkContext lacks that accessor. Is there any way to set hadoop configuration values into the Hadoop Configuration used by the pyspark context? ...

What to set `SPARK_HOME` to?

python,apache-spark,pythonpath,pyspark,apache-zeppelin
Installed apache-maven-3.3.3, scala 2.11.6, then ran: $ git clone git://github.com/apache/spark.git -b branch-1.4 $ cd spark $ build/mvn -DskipTests clean package Finally: $ git clone https://github.com/apache/incubator-zeppelin $ cd incubator-zeppelin/ $ mvn install -DskipTests Then ran the server: $ bin/zeppelin-daemon.sh start Running a simple notebook beginning with %pyspark, I got an error...

pySpark .reduceByKey(min)/max weird behavior

apache-spark,pyspark
I have the following function: minTotal = numRDD.reduceByKey(min).collect() maxTotal = numRDD.reduceByKey(max).collect() A sample from my dataset that is acting strangely: (18, [u'300.0', u'1000.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'1000.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0', u'300.0',...

spark reduce and map issue

python,apache-spark,pyspark
I am doing a small experiment in Spark and I am having troubles. wordCounts is : [('rat', 2), ('elephant', 1), ('cat', 2)] # TODO: Replace <FILL IN> with appropriate code from operator import add totalCount = (wordCounts .map(lambda x: (x,1)) <==== something wrong with this line maybe .reduce(sum)) <====omething wrong...

Include package in Spark local mode

python,apache-spark,py.test,pyspark
I'm writing some unit tests for my Spark code in python. My code depends on spark-csv. In production I use spark-submit --packages com.databricks:spark-csv_2.10:1.0.3 to submit my python script. I'm using pytest to run my tests with Spark in local mode: conf = SparkConf().setAppName('myapp').setMaster('local[1]') sc = SparkContext(conf=conf) My question is, since...

Reshaping/Pivoting data in Spark RDD and/or Spark DataFrames

apache-spark,apache-spark-sql,pyspark
I have some data in the following format (either RDD or Spark DataFrame): from pyspark.sql import SQLContext sqlContext = SQLContext(sc) rdd = sc.parallelize([('X01',41,'US',3), ('X01',41,'UK',1), ('X01',41,'CA',2), ('X02',72,'US',4), ('X02',72,'UK',6), ('X02',72,'CA',7), ('X02',72,'XX',8)]) # convert to a Spark DataFrame schema = StructType([StructField('ID', StringType(), True), StructField('Age', IntegerType(), True), StructField('Country', StringType(), True), StructField('Score', IntegerType(), True)]) df...

Calculating duration by subtracting two datetime columns in string format

apache-spark,apache-spark-sql,pyspark
I have a Spark Dataframe in that consists of a series of dates: from pyspark.sql import SQLContext from pyspark.sql import Row from pyspark.sql.types import * sqlContext = SQLContext(sc) import pandas as pd rdd = sc.parallelizesc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876','sip:4534454450'), ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321','sip:6413445440'), ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229','sip:4534437492'),...

Emit multiple pairs in map operation

apache-spark,pyspark
Let's say I have rows of phone call records the format: [CallingUser, ReceivingUser, Duration] If I want to know the total amount of time that a given user has been on the phone (sum of Duration where the User was the CallingUser or the ReceivingUser). Effectively, for a given record,...

Usage of local variables in closures when accessing Spark RDDs

closures,apache-spark,rdd,pyspark
I have a question regarding the usage of local variables in closures when accessing Spark RDDs. The problem I would like to solve looks as follows: I have a list of textfiles that should be read into an RDD. However, first I need to add additional information to an RDD...

getting number of visible nodes in PySpark

apache-spark,pyspark
I'm running some operations in PySpark, and recently increased the number of nodes in my configuration (which is on Amazon EMR). However, even though I tripled the number of nodes (from 4 to 12), performance seems not to have changed. As such, I'd like to see if the new nodes...

Spark: use reduceByKey instead of groupByKey and mapByValues

python,apache-spark,pyspark
I have an RDD with duplicates values with the following format: [ {key1: A}, {key1: A}, {key1: B}, {key1: C}, {key2: B}, {key2: B}, {key2: D}, ..] I would like the new RDD to have the following output and to get ride of duplicates. [ {key1: [A,B,C]}, {key2: [B,D]}, ..]...

Call Distinct on 'pyspark.resultiterable.ResultIterable'

python,apache-spark,pyspark
I am writing some spark code and I have an RDD which looks like [(4, <pyspark.resultiterable.ResultIterable at 0x9d32a4c>), (1, <pyspark.resultiterable.ResultIterable at 0x9d32cac>), (5, <pyspark.resultiterable.ResultIterable at 0x9d32bac>), (2, <pyspark.resultiterable.ResultIterable at 0x9d32acc>)] What I need to do is to call a distinct on the pyspark.resultiterable.ResultIterable I tried this def distinctHost(a, b): p...

pySpark find Median in a distributed way?

apache-spark,pyspark
Is it possible to find median in spark in a distributed way? I am currently finding: Sum, Average, Variance, Count using the following code: dataSumsRdd = numRDD.filter(lambda x: filterNum(x[1])).map(lambda line: (line[0], float(line[1])))\ .aggregateByKey((0.0, 0.0, 0.0), lambda (sum, sum2, count), value: (sum + value, sum2 + value**2, count+1.0), lambda (suma, sum2a,...