hadoop,apache-spark,cloud , Spark - How to count number of records by key

Spark - How to count number of records by key


Tag: hadoop,apache-spark,cloud

This is probably an easy problem but basically I have a dataset where I am to count the number of females for each country. Ultimately I want to group each count by the country but I am unsure of what to use for the value since there is not a count column in the dataset that I can use as the value in a groupByKey or reduceByKey. I thought of using a reduceByKey() but that requires a key-value pair and I only want to count the key and make a counter as the value. How do I go about this?

val lines = sc.textFile("/home/cloudera/desktop/file.txt")
val split_lines = lines.map(_.split(","))
val femaleOnly = split_lines.filter(x => x._10 == "Female")

Here is where I am stuck. The country is index 13 in the dataset also. The output should something look like this: (Australia, 201000) (America, 420000) etc Any help would be great. Thanks


You're nearly there! All you need is a countByValue:

val countOfFemalesByCountry = femaleOnly.map(_(13)).countByValue()
// Prints (Australia, 230), (America, 23242), etc.

(In your example, I assume you meant x(10) rather than x._10)

All together:

    .filter(x => x(10) == "Female")


spark-mllib: Error “reassignment to val” in source code

I'm using IDEA SBT project to test spark-mllib code. Here is build.sbt: name := "SparkTest" version := "1.0" scalaVersion := "2.11.6" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.2.0", "org.apache.spark" %% "spark-mllib" % "1.2.0" ) After all the import and compile work has done, I found some errors in lib...

Spark streaming transform function

I am having compilation errors in the transform function for spark streaming. Specifically seem to be missing finalizing the DStream variable or something similar. I have copied from the amplab tutorials so slightly confused... Here is the code, the problem is in the transform function towards the end. Here is...

Access key from mapValues or flatMapValues?

In Spark 1.3, is there a way to access the key from mapValues? Specifically, if I have val y = x.groupBy(someKey) val z = y.mapValues(someFun) can someFun know which key of y it is currently operating on? Or do I have to do val y = x.map(r => (someKey(r), r)).groupBy(_._1)...

SQL Server 2012 & Polybase - 'Hadoop Connectivity' configuration option missing

As described in the title, I am using SQL Server 2012 Parallel Data Warehouse with Polybase feature to try to access a HDInisght Hadoop cluster. As a starting point for every connection to Hadoop from SQL Server, I find to execute the command sp_configure @configname = 'hadoop connectivity', @configvalue =...

Profiling a scala spark application

I would like to profile my spark scala applications to figure out the parts of the code which i have to optimize. I enabled -Xprof in --driver-java-options but this is not of much help to me as it gives lot of granular details. I am just interested to know how...

Flink error - org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4

I am trying to run a flink job using a file from HDFS. I have created a dataset as following - DataSource<Tuple2<LongWritable, Text>> visits = env.readHadoopFile(new TextInputFormat(), LongWritable.class,Text.class, Config.pathToVisits()); I am using flink's latest version - 0.9.0-milestone-1-hadoop1 (I have also tried with 0.9.0-milestone-1) whereas my Hadoop version is 2.6.0 But,...

Vertica: Input record 1 has been rejected (Too few columns found)

I am trying to copy file from Hadoop to a Vertica table and get the an error. The problem is same copy sometimes pass and some times fails,any idea? The Error: Caused by: java.sql.SQLException: [Vertica]VJDBC ERROR: COPY: Input record 1 has been rejected (Too few columns found) at com.vertica.util.ServerErrorData.buildException(Unknown Source)...

hadoop complains about attempting to overwrite nonempty destination directory

I'm following Rasesh Mori's instructions to install Hadoop on a multinode cluster, and have gotten to the point where jps shows the various nodes are up and running. I can copy files into hdfs; I did so with $HADOOP_HOME/bin/hdfs dfs -put ~/in /in and then tried to run the wordcount...

how to drop partition metadata from hive, when partition is drop by using alter drop command

I have dropped the all the partitions in the hive table by using the alter command alter table emp drop partition (hiredate>'0'); After droping partitions still I can see the partitions metadata.How to delete this partition metadata? Can I use the same table for new partitions? ...

Hadoop map reduce Extract specific columns from csv file in csv format

I am new to hadoop and working on a big data project where I have to clean and filter given csv file. like if given csv file has 200 columns then I need to select only 20 specific columns (so called data filtering) as a output for further operation. also...

How to extract application ID from the PySpark context

A previous question recommends sc.applicationId, but it is not present in PySpark, only in scala. So, how do I figure out the application id (for yarn) of my PySpark process?...

Pyspark: using filter for feature selection

I have an array of dimensions 500 x 26. Using the filter operation in pyspark, I'd like to pick out the columns which are listed in another array at row i. Ex: if a[i]= [1 2 3] Then pick out columns 1, 2 and 3 and all rows. Can this...

Install SparkR that comes with Spark 1.4

The newest version of Spark (1.4) now comes with SparkR. Does anyone know how to go about installing the SparkR implementation on Windows? The sparkR.R script is currently located in C:/spark-1.4.0/R/pkgs/R/ This appears to be a step in the right direction, but the instructions don't work for Windows as there...

Which spark MLIB algorithm to use?

I'm newbie to machine learning and would like to understand what algorithm (Classification algorithm or co-relation algorithm?) to use in order to understand what is the relationship between one or more attributes. for example consider I have following set of attributes, Bill No, Bill Amount, Tip amount, Waiter Name and...

Graphx EdgeRDD count taking long time to compute

I am running a stand alone spark, I have this code below related to EdgeRDD. These are graph edges loaded from a textfile. There are around 67 million records. val edges: RDD[Edge[Int]] = edge_file.map(line => {val x = line.split("\\s+") Edge(x(0).toLong, x(1).toLong, x(2).toInt); }) val edges1: EdgeRDD[Int] = EdgeRDD.fromEdges(edges) println(edges1.count) The...

How to un-nest a spark rdd that has the following type ((String, scala.collection.immutable.Map[String,scala.collection.immutable.Map[String,Int]]))

Its a nested map with contents like this when i print it onto screen (5, Map ( "ABCD" -> Map("3200" -> 3, "3350.800" -> 4, "200.300" -> 3) (1, Map ( "DEF" -> Map("1200" -> 32, "1320.800" -> 4, "2100" -> 3) I need to get something like this Case...

Hadoop append data to hdfs file and ignore duplicate entries

How can I append data to HDFS files and ignore duplicate values? I have a huge HDFS file (MainFile) and I have 2 other new files from different sources and I want to append data from this files to the MainFile. Main File and the other files has same structure....

JMH Benchmark on Hadoop YARN

I have written a JMH benchmark for my MapReduce job. If I run my app in local mode, it works, but when I run it with the yarn script on my hadoop cluster, then I get the following error: [[email protected] Desktop]$ ./launch_mapreduce.sh # JMH 1.10 (released 5 days ago) #...

Removing duplicates from Spark RDDPair values

I am new to Python and also Spark. I've an pair RDD containing (key, List) but some of the values are duplicate. RDD is of the form (zipCode,streets) I want a pair RDD which does not contain duplicates. I am trying to achieve it using python. Can anyone please help...

OutofMemoryErrory creating fat jar with sbt assembly

We are trying to make a fat jar file containing one small scala source file and a ton of dependencies (simple mapreduce example using spark and cassandra): import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import com.datastax.spark.connector._ import org.apache.spark.SparkConf object VMProcessProject { def main(args: Array[String]) { val conf = new SparkConf() .set("spark.cassandra.connection.host", "") .set("spark.executor.extraClassPath",...

issue monitoring hadoop response

I am using ganglia to monitor Hadoop. gmond and gmetad are running fine. When I telnet on gmond port (8649) and when I telnet gmetad on its xml answer port, I get no hadoop data. How can it be ? cluster { name = "my cluster" owner = "Master" latlong...

How to run hadoop appliaction automatically?

I know that a MapReduce program can be ran using the command line "hadoop jar *.jar" for a time. But now the program is required to be ran a time for every hour in background. Are there any methods to make the MR program be hourly submitted to hadoop automatically?...

PySpark No suitable driver found for jdbc:mysql://dbhost

I am trying to write my dataframe to a mysql table. I am getting No suitable driver found for jdbc:mysql://dbhost when I try write. As part of the preprocessing I read from other tables in the same DB and have no issues doing that. I can do the full run...

Convert RDD[Map[String,Double]] to RDD[(String,Double)]

I did some calculation and returned my values in a RDD containing scala map and now I want to remove this map and want to collect all keys values in a RDD. Any help will be appreciated....

Count of second dimension in two dimension data in Spark

I have the data in this format (apple, laptop) (apple, laptop) (apple, ipad) (dell, laptop) I want to output to be (apple, laptop, 2) (apple, ipad, 1) (dell, laptop, 1) I wanted to do this using groupby and then count but groupby is not allowing grouping based on two columns....

Spark executors with different amounts of memory on Mesos

Is it possible to have executors with different amounts of memory on a Mesos cluster? Or am I bounded by the machine with the least memory? (Assuming I want to use all available cpus).

Error when running job that queries against Cassandra via Spark SQL through Spark Jobserver

So I'm trying to run job that simply runs a query against cassandra using spark-sql, the job is submitted fine and the job starts fine. This code works when it is not being run through spark jobserver (when simply using spark submit). Could someone tell my what is wrong with...

Pyspark StructType is not defined

I'm trying to struct a schema for db testing, and StructType apparently isn't working for some reason. I'm following a tut, and it doesn't import any extra module. <type 'exceptions.NameError'>, NameError("name 'StructType' is not defined",), <traceback object at 0x2b555f0>) I'm on spark 1.4.0, and Ubuntu 12 if that has anything...

Spark on yarn jar upload problems

I am trying to run a simple Map/Reduce java program using spark over yarn (Cloudera Hadoop 5.2 on CentOS). I have tried this 2 different ways. The first way is the following: YARN_CONF_DIR=/usr/lib/hadoop-yarn/etc/hadoop/; /var/tmp/spark/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class MRContainer --master yarn-cluster --jars /var/tmp/spark/spark-1.4.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-hadoop2.4.0.jar simplemr.jar This method gives the following error: diagnostics: Application application_1434177111261_0007...

In sbt, how can we specify the version of hadoop on which spark depends?

Well I have a sbt project which uses spark and spark sql, but my cluster uses hadoop 1.0.4 and spark 1.2 with spark-sql 1.2, currently my build.sbt looks like this: libraryDependencies ++= Seq( "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5", "com.datastax.cassandra" % "cassandra-driver-mapping" % "2.1.5", "com.datastax.spark" % "spark-cassandra-connector_2.10" % "1.2.1", "org.apache.spark" %...

Importtsv command gives : Container exited with a non-zero exit code 1 error

I am trying to load a tsv file into an existing hbase table. I am using the following command: /usr/local/hbase/bin$ hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,cf:value '-Dtable_name.separator=\t' Table-name /hdfs-path-to-input-file But when I execute the above command, I get the following error Container id: container_1434304449478_0018_02_000001 Exit code: 1 Stack trace: ExitCodeException exitCode=1: at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)...

Retrieving TriangleCount

I'm trying to retrieve the amount of triangles from a graph using graphX. As I'm new to both Scala and graphX, I'm currently quite stuck. I'm creating a graph from an edgefile: 1 2 1 3 2 3 This should be 1 triangle. Next I'm using the build in function...

Join files using Apache Spark / Spark SQL

I am trying to use Apache Spark for comparing two different files based on some common field, and get the values from both files and write it as output file. I am using Spark SQL for joining both files (after storing the RDD as table). Is this the correct approach?...

Merging two columns into a single column and formatting the content to form an accurate date-time format in Hive?

these are the 2 columns(month,year). I want to create a single column out of them having an accurate date-time format('YYYY-MM-DD HH:MM:SS') and add as new column in the table. Month year 12/ 3 2013 at 8:40pm 12/ 3 2013 at 8:39pm 12/ 3 2013 at 8:39pm 12/ 3 2013 at...

Spark-submit class not found exception

I am trying to run this simple spark application with the spark submit command using this quick start tutorial. http://spark.apache.org/docs/1.2.0/quick-start.html#self-contained-applications. when I try to run it using spark-1.4.0-bin-hadoop2.6\bin>spark-submit --class " SimpleApp" --master local[4] C:/.../Documents/Sparkapp/target/scala- 2.10/simple-project_2.10-1.0.jar I get the following exception: java.lang.ClassNotFoundException: SimpleApp at java.net.URLClassLoader$1.run(Unknown Source) at java.net.URLClassLoader$1.run(Unknown...

Include package in Spark local mode

I'm writing some unit tests for my Spark code in python. My code depends on spark-csv. In production I use spark-submit --packages com.databricks:spark-csv_2.10:1.0.3 to submit my python script. I'm using pytest to run my tests with Spark in local mode: conf = SparkConf().setAppName('myapp').setMaster('local[1]') sc = SparkContext(conf=conf) My question is, since...

Spark: use reduceByKey instead of groupByKey and mapByValues

I have an RDD with duplicates values with the following format: [ {key1: A}, {key1: A}, {key1: B}, {key1: C}, {key2: B}, {key2: B}, {key2: D}, ..] I would like the new RDD to have the following output and to get ride of duplicates. [ {key1: [A,B,C]}, {key2: [B,D]}, ..]...

Shuffled vs non-shuffled coalesce in Apache Spark

What is the difference between the following transformations when they are executed right before writing RDD to a file? coalesce(1, shuffle = true) coalesce(1, shuffle = false) Code example: val input = sc.textFile(inputFile) val filtered = input.filter(doSomeFiltering) val mapped = filtered.map(doSomeMapping) mapped.coalesce(1, shuffle = true).saveAsTextFile(outputFile) vs mapped.coalesce(1, shuffle = false).saveAsTextFile(outputFile)...

Is the DStream return by updateStateByKey function only contains one RDD?

Is the DStream return by updateStateByKey function only contains one RDD? If not,Under what circumstances will the DStream contains more than one RDD?

HIVE: apply delimiter until a specified column

I am trying to move data from a file into a hive table. The data in the file looks something like this:- StringA StringB StringC StringD StringE where each string is separated by a space. The problem is that i want separate columns for StringA, StringB and StringC and one...

Create an external Hive table from an existing external table

I have a set of CSV files in a HDFS path and I created an external Hive table, let's say table_A, from these files. Since some of the entries are redundant, I tried creating another Hive table based on table_A, say table_B, which has distinct records. I was able to...

Input of the reduce phase is not what I expect in Hadoop (Java)

I'm working on a very simple graph analysis tool in Hadoop using MapReduce. I have a graph that looks like the following (each row represents and edge - in fact, this is a triangle graph): 1 3 3 1 3 2 2 3 Now, I want to use MapReduce to...

How to insert and Update simultaneously to PostgreSQL with sqoop command

I am trying to insert into postgreSQL DB with sqoop command. sqoop export --connect jdbc:postgresql:// --table table1 --username user1 --password pass1--export-dir /hivetables/table/ --fields-terminated-by '|' --lines-terminated-by '\n' -- --schema schema It is working fine if there is not primary key constrain. I want to insert new records and update old records...

How do I flatMap a row of arrays into multiple rows?

After parsing some jsons I have a one-column DataFrame of arrays scala> val jj =sqlContext.jsonFile("/home/aahu/jj2.json") res68: org.apache.spark.sql.DataFrame = [r: array<bigint>] scala> jj.first() res69: org.apache.spark.sql.Row = [List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)] I'd like to explode each row out into several rows. How? edit: Original json file:...

Sqoop Export with Missing Data

I am trying to use Sqoop to export data from HDFS into Postgresql. However, I receive an error partially through the export that it can't parse the input. I manually went into the file I was exporting and saw that this row had two columns missing. I have tried a...

Hive external table not reading entirety of string from CSV source

Relatively new to the Hadoop world so apologies if this is a no-brainer but I haven't found anything on this on SO or elsewhere. In short, I have an external table created in Hive that reads data from a folder of CSV files in HDFS. The issue is that while...

Why we are configuring mapred.job.tracker in YARN?

What I know is YARN is introduced and it replaced JobTracker and TaskTracker. I have seen is some Hadoop 2.6.0/2.7.0 installation tutorials and they are configuring mapreduce.framework.name as yarn and mapred.job.tracker property as local or host:port. The description for mapred.job.tracker property is "The host and port that the MapReduce job...

Passing a function foreach key of an Array

I have an array like that : val pairs: Array[(Int, ((VertexId, Seq[Int]), Int))] which generates this output : (11,((11,ArraySeq(2, 5, 4, 5)),1)) (11,((12,ArraySeq(7, 7, 8, 2)),1)) (11,((13,ArraySeq(5, 9, 8, 7)),1)) (1,((1,ArraySeq(1, 2, 3, 4)),1)) (1,((4,ArraySeq(1, 5, 1, 1)),1)) I want to build a Graph for each pairs._1. That means for...

How to get rid of “Spark assembly has been built with Hive, including Datanucleus jars on classpath” message?

I am running an Apache Spark app as a cron job, but I keep getting emails with the following message Spark assembly has been built with Hive, including Datanucleus jars on classpath message My cron entry is something like the following ...home/sparkJob.sh > /home/SaprkJobOperation-`date +\%Y\%m\%d\%H`-cron.log I understand that there are...