hadoop,apache-spark,partitioning,rdd,pyspark , PySpark repartitioning RDD elements


PySpark repartitioning RDD elements

Question:

Tag: hadoop,apache-spark,partitioning,rdd,pyspark

I have a spark job that reads from a Kafka stream and performs an action for each RDD in the stream. If the RDD is not empty, I want to save the RDD to HDFS, but I want to create a file for each element in the RDD. I've found

RDD.saveAsTextFile(file_location)

Will create a file for each partition, so I am trying to change the RDD such that each partition contains only one element. Here's an example of what I'm trying to do

data = sc.parallelize(['1', '2', '3', '4', '5', '6', '7', '8', '9', '0'])
data.glom().collect() #Produces [['1', '2', '3', '4', '5'], ['6', '7', '8', '9', '0']]
data.saveAsTextFile(file_location) #Produces 2 files

I can get closer to what I want, but I can't find a way to ensure each partition has only one element

data1 = data.coalesce(1, True).repartition(data.count())
data1.glom().collect() #Produces [[], ['1', '2', '3', '4', '5'], ['6', '7', '8', '9', '0'], [], [], [], [], [], [], []] 
data2 = data.map(lambda t : t).coalesce(1, True).repartition(data.count())
data2.glom().collect() #Produces [[], ['1'], ['2', '3'], ['4', '5'], ['6'], ['7', '8'], ['9', '0'], [], [], []] 
data2.saveAsTextFile(file_location) #Produces 10 files, but some are empty

I know in this example I can pass my desired partitions to sc.parallelize() but that won't be possible when I am reading from a kafka stream. Any recommendations on how to repartition the way I want, or on how to better approach this?


Answer:

Well, here is a python solution for custom partitioning.

(Just to be clear, getting each element in separate file is probably not the best design).

data = sc.parallelize(['1', '2', '3', '4', '5', '6', '7', '8', '9', '0']).map(lambda x: (x,x))
print data.collect()
c = data.count()
wp = data.partitionBy(c,lambda k: int(k))
print wp.map(lambda t: t[0]).glom().collect()
sc.stop()

Result:

[('1', '1'), ('2', '2'), ('3', '3'), ('4', '4'), ('5', '5'), ('6', '6'), ('7', '7'), ('8', '8'), ('9', '9'), ('0', '0')]
[['0'], ['1'], ['2'], ['3'], ['4'], ['5'], ['6'], ['7'], ['8'], ['9']]

Hope this helps.


Related:


Pyspark: using filter for feature selection


python,apache-spark,pyspark
I have an array of dimensions 500 x 26. Using the filter operation in pyspark, I'd like to pick out the columns which are listed in another array at row i. Ex: if a[i]= [1 2 3] Then pick out columns 1, 2 and 3 and all rows. Can this...

SQL Server 2012 & Polybase - 'Hadoop Connectivity' configuration option missing


sql-server,hadoop,sql-server-2012
As described in the title, I am using SQL Server 2012 Parallel Data Warehouse with Polybase feature to try to access a HDInisght Hadoop cluster. As a starting point for every connection to Hadoop from SQL Server, I find to execute the command sp_configure @configname = 'hadoop connectivity', @configvalue =...

Input of the reduce phase is not what I expect in Hadoop (Java)


java,hadoop,mapreduce,reduce,emit
I'm working on a very simple graph analysis tool in Hadoop using MapReduce. I have a graph that looks like the following (each row represents and edge - in fact, this is a triangle graph): 1 3 3 1 3 2 2 3 Now, I want to use MapReduce to...

Hive external table not reading entirety of string from CSV source


csv,hadoop,hive,hiveql
Relatively new to the Hadoop world so apologies if this is a no-brainer but I haven't found anything on this on SO or elsewhere. In short, I have an external table created in Hive that reads data from a folder of CSV files in HDFS. The issue is that while...

Install SparkR that comes with Spark 1.4


r,apache-spark,sparkr
The newest version of Spark (1.4) now comes with SparkR. Does anyone know how to go about installing the SparkR implementation on Windows? The sparkR.R script is currently located in C:/spark-1.4.0/R/pkgs/R/ This appears to be a step in the right direction, but the instructions don't work for Windows as there...

Spark-submit class not found exception


scala,apache-spark
I am trying to run this simple spark application with the spark submit command using this quick start tutorial. http://spark.apache.org/docs/1.2.0/quick-start.html#self-contained-applications. when I try to run it using spark-1.4.0-bin-hadoop2.6\bin>spark-submit --class " SimpleApp" --master local[4] C:/.../Documents/Sparkapp/target/scala- 2.10/simple-project_2.10-1.0.jar I get the following exception: java.lang.ClassNotFoundException: SimpleApp at java.net.URLClassLoader$1.run(Unknown Source) at java.net.URLClassLoader$1.run(Unknown...

Retrieving TriangleCount


scala,apache-spark,spark-graphx
I'm trying to retrieve the amount of triangles from a graph using graphX. As I'm new to both Scala and graphX, I'm currently quite stuck. I'm creating a graph from an edgefile: 1 2 1 3 2 3 This should be 1 triangle. Next I'm using the build in function...

Convert RDD[Map[String,Double]] to RDD[(String,Double)]


scala,apache-spark,rdd
I did some calculation and returned my values in a RDD containing scala map and now I want to remove this map and want to collect all keys values in a RDD. Any help will be appreciated....

Count of second dimension in two dimension data in Spark


apache-spark
I have the data in this format (apple, laptop) (apple, laptop) (apple, ipad) (dell, laptop) I want to output to be (apple, laptop, 2) (apple, ipad, 1) (dell, laptop, 1) I wanted to do this using groupby and then count but groupby is not allowing grouping based on two columns....

Spark on yarn jar upload problems


java,hadoop,mapreduce,apache-spark
I am trying to run a simple Map/Reduce java program using spark over yarn (Cloudera Hadoop 5.2 on CentOS). I have tried this 2 different ways. The first way is the following: YARN_CONF_DIR=/usr/lib/hadoop-yarn/etc/hadoop/; /var/tmp/spark/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class MRContainer --master yarn-cluster --jars /var/tmp/spark/spark-1.4.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-hadoop2.4.0.jar simplemr.jar This method gives the following error: diagnostics: Application application_1434177111261_0007...

Sqoop Export with Missing Data


sql,postgresql,shell,hadoop,sqoop
I am trying to use Sqoop to export data from HDFS into Postgresql. However, I receive an error partially through the export that it can't parse the input. I manually went into the file I was exporting and saw that this row had two columns missing. I have tried a...

How to insert and Update simultaneously to PostgreSQL with sqoop command


postgresql,hadoop,hive,sqoop
I am trying to insert into postgreSQL DB with sqoop command. sqoop export --connect jdbc:postgresql://10.11.12.13:1234/db --table table1 --username user1 --password pass1--export-dir /hivetables/table/ --fields-terminated-by '|' --lines-terminated-by '\n' -- --schema schema It is working fine if there is not primary key constrain. I want to insert new records and update old records...

Removing duplicates from Spark RDDPair values


python,apache-spark,pyspark
I am new to Python and also Spark. I've an pair RDD containing (key, List) but some of the values are duplicate. RDD is of the form (zipCode,streets) I want a pair RDD which does not contain duplicates. I am trying to achieve it using python. Can anyone please help...

Create an external Hive table from an existing external table


csv,hadoop,hive
I have a set of CSV files in a HDFS path and I created an external Hive table, let's say table_A, from these files. Since some of the entries are redundant, I tried creating another Hive table based on table_A, say table_B, which has distinct records. I was able to...

Shuffled vs non-shuffled coalesce in Apache Spark


scala,apache-spark,bigdata,distributed-computing
What is the difference between the following transformations when they are executed right before writing RDD to a file? coalesce(1, shuffle = true) coalesce(1, shuffle = false) Code example: val input = sc.textFile(inputFile) val filtered = input.filter(doSomeFiltering) val mapped = filtered.map(doSomeMapping) mapped.coalesce(1, shuffle = true).saveAsTextFile(outputFile) vs mapped.coalesce(1, shuffle = false).saveAsTextFile(outputFile)...

How do I flatMap a row of arrays into multiple rows?


apache-spark,apache-spark-sql
After parsing some jsons I have a one-column DataFrame of arrays scala> val jj =sqlContext.jsonFile("/home/aahu/jj2.json") res68: org.apache.spark.sql.DataFrame = [r: array<bigint>] scala> jj.first() res69: org.apache.spark.sql.Row = [List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)] I'd like to explode each row out into several rows. How? edit: Original json file:...

In sbt, how can we specify the version of hadoop on which spark depends?


apache-spark,sbt
Well I have a sbt project which uses spark and spark sql, but my cluster uses hadoop 1.0.4 and spark 1.2 with spark-sql 1.2, currently my build.sbt looks like this: libraryDependencies ++= Seq( "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5", "com.datastax.cassandra" % "cassandra-driver-mapping" % "2.1.5", "com.datastax.spark" % "spark-cassandra-connector_2.10" % "1.2.1", "org.apache.spark" %...

Flink error - org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4


maven,hadoop,flink
I am trying to run a flink job using a file from HDFS. I have created a dataset as following - DataSource<Tuple2<LongWritable, Text>> visits = env.readHadoopFile(new TextInputFormat(), LongWritable.class,Text.class, Config.pathToVisits()); I am using flink's latest version - 0.9.0-milestone-1-hadoop1 (I have also tried with 0.9.0-milestone-1) whereas my Hadoop version is 2.6.0 But,...

Hadoop map reduce Extract specific columns from csv file in csv format


java,hadoop,file-io,mapreduce,bigdata
I am new to hadoop and working on a big data project where I have to clean and filter given csv file. like if given csv file has 200 columns then I need to select only 20 specific columns (so called data filtering) as a output for further operation. also...

Hadoop append data to hdfs file and ignore duplicate entries


java,hadoop,mapreduce,hive,hdfs
How can I append data to HDFS files and ignore duplicate values? I have a huge HDFS file (MainFile) and I have 2 other new files from different sources and I want to append data from this files to the MainFile. Main File and the other files has same structure....

issue monitoring hadoop response


hadoop,cluster-computing,ganglia,gmetad
I am using ganglia to monitor Hadoop. gmond and gmetad are running fine. When I telnet on gmond port (8649) and when I telnet gmetad on its xml answer port, I get no hadoop data. How can it be ? cluster { name = "my cluster" owner = "Master" latlong...

JMH Benchmark on Hadoop YARN


java,hadoop,yarn,microbenchmark,jmh
I have written a JMH benchmark for my MapReduce job. If I run my app in local mode, it works, but when I run it with the yarn script on my hadoop cluster, then I get the following error: [[email protected] Desktop]$ ./launch_mapreduce.sh # JMH 1.10 (released 5 days ago) #...

Error when running job that queries against Cassandra via Spark SQL through Spark Jobserver


cassandra,apache-spark,apache-spark-sql,spark-jobserver,spark-cassandra-connector
So I'm trying to run job that simply runs a query against cassandra using spark-sql, the job is submitted fine and the job starts fine. This code works when it is not being run through spark jobserver (when simply using spark submit). Could someone tell my what is wrong with...

Graphx EdgeRDD count taking long time to compute


apache-spark,spark-graphx
I am running a stand alone spark, I have this code below related to EdgeRDD. These are graph edges loaded from a textfile. There are around 67 million records. val edges: RDD[Edge[Int]] = edge_file.map(line => {val x = line.split("\\s+") Edge(x(0).toLong, x(1).toLong, x(2).toInt); }) val edges1: EdgeRDD[Int] = EdgeRDD.fromEdges(edges) println(edges1.count) The...

Is the DStream return by updateStateByKey function only contains one RDD?


apache-spark,spark-streaming,apache-spark-sql,pyspark
Is the DStream return by updateStateByKey function only contains one RDD? If not,Under what circumstances will the DStream contains more than one RDD?

how to drop partition metadata from hive, when partition is drop by using alter drop command


hadoop,apache-hive
I have dropped the all the partitions in the hive table by using the alter command alter table emp drop partition (hiredate>'0'); After droping partitions still I can see the partitions metadata.How to delete this partition metadata? Can I use the same table for new partitions? ...

PySpark No suitable driver found for jdbc:mysql://dbhost


apache-spark,apache-spark-sql,pyspark
I am trying to write my dataframe to a mysql table. I am getting No suitable driver found for jdbc:mysql://dbhost when I try write. As part of the preprocessing I read from other tables in the same DB and have no issues doing that. I can do the full run...

hadoop complains about attempting to overwrite nonempty destination directory


hadoop,hdfs
I'm following Rasesh Mori's instructions to install Hadoop on a multinode cluster, and have gotten to the point where jps shows the various nodes are up and running. I can copy files into hdfs; I did so with $HADOOP_HOME/bin/hdfs dfs -put ~/in /in and then tried to run the wordcount...

Which spark MLIB algorithm to use?


machine-learning,apache-spark
I'm newbie to machine learning and would like to understand what algorithm (Classification algorithm or co-relation algorithm?) to use in order to understand what is the relationship between one or more attributes. for example consider I have following set of attributes, Bill No, Bill Amount, Tip amount, Waiter Name and...

Why we are configuring mapred.job.tracker in YARN?


hadoop,mapreduce,yarn
What I know is YARN is introduced and it replaced JobTracker and TaskTracker. I have seen is some Hadoop 2.6.0/2.7.0 installation tutorials and they are configuring mapreduce.framework.name as yarn and mapred.job.tracker property as local or host:port. The description for mapred.job.tracker property is "The host and port that the MapReduce job...

Passing a function foreach key of an Array


scala,apache-spark,scala-collections,spark-graphx
I have an array like that : val pairs: Array[(Int, ((VertexId, Seq[Int]), Int))] which generates this output : (11,((11,ArraySeq(2, 5, 4, 5)),1)) (11,((12,ArraySeq(7, 7, 8, 2)),1)) (11,((13,ArraySeq(5, 9, 8, 7)),1)) (1,((1,ArraySeq(1, 2, 3, 4)),1)) (1,((4,ArraySeq(1, 5, 1, 1)),1)) I want to build a Graph for each pairs._1. That means for...

Spark streaming transform function


java,apache-spark,spark-streaming
I am having compilation errors in the transform function for spark streaming. Specifically seem to be missing finalizing the DStream variable or something similar. I have copied from the amplab tutorials so slightly confused... Here is the code, the problem is in the transform function towards the end. Here is...

How to run hadoop appliaction automatically?


hadoop
I know that a MapReduce program can be ran using the command line "hadoop jar *.jar" for a time. But now the program is required to be ran a time for every hour in background. Are there any methods to make the MR program be hourly submitted to hadoop automatically?...

Pyspark StructType is not defined


python,apache-spark,pyspark
I'm trying to struct a schema for db testing, and StructType apparently isn't working for some reason. I'm following a tut, and it doesn't import any extra module. <type 'exceptions.NameError'>, NameError("name 'StructType' is not defined",), <traceback object at 0x2b555f0>) I'm on spark 1.4.0, and Ubuntu 12 if that has anything...

HIVE: apply delimiter until a specified column


hadoop,datatable,hive,delimiter
I am trying to move data from a file into a hive table. The data in the file looks something like this:- StringA StringB StringC StringD StringE where each string is separated by a space. The problem is that i want separate columns for StringA, StringB and StringC and one...

Include package in Spark local mode


python,apache-spark,py.test,pyspark
I'm writing some unit tests for my Spark code in python. My code depends on spark-csv. In production I use spark-submit --packages com.databricks:spark-csv_2.10:1.0.3 to submit my python script. I'm using pytest to run my tests with Spark in local mode: conf = SparkConf().setAppName('myapp').setMaster('local[1]') sc = SparkContext(conf=conf) My question is, since...

Access key from mapValues or flatMapValues?


scala,apache-spark
In Spark 1.3, is there a way to access the key from mapValues? Specifically, if I have val y = x.groupBy(someKey) val z = y.mapValues(someFun) can someFun know which key of y it is currently operating on? Or do I have to do val y = x.map(r => (someKey(r), r)).groupBy(_._1)...

spark-mllib: Error “reassignment to val” in source code


apache-spark,mllib
I'm using IDEA SBT project to test spark-mllib code. Here is build.sbt: name := "SparkTest" version := "1.0" scalaVersion := "2.11.6" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.2.0", "org.apache.spark" %% "spark-mllib" % "1.2.0" ) After all the import and compile work has done, I found some errors in lib...

How to extract application ID from the PySpark context


apache-spark,yarn,pyspark
A previous question recommends sc.applicationId, but it is not present in PySpark, only in scala. So, how do I figure out the application id (for yarn) of my PySpark process?...

OutofMemoryErrory creating fat jar with sbt assembly


jar,cassandra,apache-spark,sbt
We are trying to make a fat jar file containing one small scala source file and a ton of dependencies (simple mapreduce example using spark and cassandra): import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import com.datastax.spark.connector._ import org.apache.spark.SparkConf object VMProcessProject { def main(args: Array[String]) { val conf = new SparkConf() .set("spark.cassandra.connection.host", "127.0.0.1") .set("spark.executor.extraClassPath",...

Merging two columns into a single column and formatting the content to form an accurate date-time format in Hive?


sql,regex,hadoop,hive,datetime-format
these are the 2 columns(month,year). I want to create a single column out of them having an accurate date-time format('YYYY-MM-DD HH:MM:SS') and add as new column in the table. Month year 12/ 3 2013 at 8:40pm 12/ 3 2013 at 8:39pm 12/ 3 2013 at 8:39pm 12/ 3 2013 at...

Profiling a scala spark application


scala,apache-spark
I would like to profile my spark scala applications to figure out the parts of the code which i have to optimize. I enabled -Xprof in --driver-java-options but this is not of much help to me as it gives lot of granular details. I am just interested to know how...

Spark: use reduceByKey instead of groupByKey and mapByValues


python,apache-spark,pyspark
I have an RDD with duplicates values with the following format: [ {key1: A}, {key1: A}, {key1: B}, {key1: C}, {key2: B}, {key2: B}, {key2: D}, ..] I would like the new RDD to have the following output and to get ride of duplicates. [ {key1: [A,B,C]}, {key2: [B,D]}, ..]...

Spark executors with different amounts of memory on Mesos


apache-spark,mesos
Is it possible to have executors with different amounts of memory on a Mesos cluster? Or am I bounded by the machine with the least memory? (Assuming I want to use all available cpus).

Join files using Apache Spark / Spark SQL


java,apache-spark,apache-spark-sql
I am trying to use Apache Spark for comparing two different files based on some common field, and get the values from both files and write it as output file. I am using Spark SQL for joining both files (after storing the RDD as table). Is this the correct approach?...

How to get rid of “Spark assembly has been built with Hive, including Datanucleus jars on classpath” message?


cron,apache-spark
I am running an Apache Spark app as a cron job, but I keep getting emails with the following message Spark assembly has been built with Hive, including Datanucleus jars on classpath message My cron entry is something like the following ...home/sparkJob.sh > /home/SaprkJobOperation-`date +\%Y\%m\%d\%H`-cron.log I understand that there are...

Vertica: Input record 1 has been rejected (Too few columns found)


hadoop,vertica
I am trying to copy file from Hadoop to a Vertica table and get the an error. The problem is same copy sometimes pass and some times fails,any idea? The Error: Caused by: java.sql.SQLException: [Vertica]VJDBC ERROR: COPY: Input record 1 has been rejected (Too few columns found) at com.vertica.util.ServerErrorData.buildException(Unknown Source)...

How to un-nest a spark rdd that has the following type ((String, scala.collection.immutable.Map[String,scala.collection.immutable.Map[String,Int]]))


scala,cassandra,apache-spark
Its a nested map with contents like this when i print it onto screen (5, Map ( "ABCD" -> Map("3200" -> 3, "3350.800" -> 4, "200.300" -> 3) (1, Map ( "DEF" -> Map("1200" -> 32, "1320.800" -> 4, "2100" -> 3) I need to get something like this Case...

Importtsv command gives : Container exited with a non-zero exit code 1 error


hadoop,hbase,classpath,yarn
I am trying to load a tsv file into an existing hbase table. I am using the following command: /usr/local/hbase/bin$ hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,cf:value '-Dtable_name.separator=\t' Table-name /hdfs-path-to-input-file But when I execute the above command, I get the following error Container id: container_1434304449478_0018_02_000001 Exit code: 1 Stack trace: ExitCodeException exitCode=1: at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)...