scala,apache-spark , Access key from mapValues or flatMapValues?


Access key from mapValues or flatMapValues?

Question:

Tag: scala,apache-spark

In Spark 1.3, is there a way to access the key from mapValues?

Specifically, if I have

val y = x.groupBy(someKey)
val z = y.mapValues(someFun)

can someFun know which key of y it is currently operating on?

Or do I have to do

val y = x.map(r => (someKey(r), r)).groupBy(_._1)
val z = y.mapValues{ case (k, r) => someFun(r, k) }

Note: the reason I want to use mapValues rather than map is to preserve the partitioning.


Answer:

In this case you can use mapPartitions with the preservesPartitioning attribute.

x.map((it => it.map { case (k,rr) => (k, someFun(rr, k)) }), preservesPartitioning = true)

You just have to make sure you are not changing the partitioning, i.e. don't change the key.


Related:


How to use the Akka ask pattern without blocking


scala,asynchronous,akka,future
Hi I have a actor which is responsible for fetching data from a database, turning it into a list and sending it back to the sender. I am using the ask pattern to receive response from my actor, because I don't want to use await.result because this approach will block...

How to unmarshall akka http request entity as string?


json,scala,akka-http
I'm trying to unmarshall request payload as string, but for some reason it's failing. My code: path("mypath") { post { decodeRequest { entity(as[String]) {jsonStr => //could not find implicit value for...FromRequestUnmarshaller[String] complete { val json: JsObject = Json.parse(jsonStr).as[JsObject] val jsObjectFuture: Future[JsObject] = MyDatabase.addListItem(json) jsObjectFuture.map(_.as[String]) } } } } } In...

Play Framework Form Error Handling


scala,playframework,playframework-2.3,playframework-2.4
This is my view file containing the form that has to filled in by the user: @helper.form(call) { @helper.input(resumeForm("surname"), '_label -> "Surname") { (id, name, value, args) => <input name="@name" type="text" value="@value" placeholder="Enter your surname"> } } This is my custom field constructor: @(elements: helper.FieldElements) @if(!elements.args.isDefinedAt('showLabel) || elements.args('showLabel) == true)...

My Scala program won't print anything


scala
Basically I'm trying to write a program that has a list of books and authors and then prints 2 things out, res1, which should print the book titles of the authors that have the name "Andrei" in it and res2 that should print the book titles that have the string...

spark-mllib: Error “reassignment to val” in source code


apache-spark,mllib
I'm using IDEA SBT project to test spark-mllib code. Here is build.sbt: name := "SparkTest" version := "1.0" scalaVersion := "2.11.6" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.2.0", "org.apache.spark" %% "spark-mllib" % "1.2.0" ) After all the import and compile work has done, I found some errors in lib...

How to define a Regex in StandardTokenParsers to identify path?


regex,scala,parsing,lexical-analysis
I am writing a parser in which I want to parse arithmetic expressions like: /hdfs://xxx.xx.xx.x:xxxx/path1/file1.jpg+1 I want to parse it change the infix to postfix and do the calculation. I used helps from a part of code in another discussion as well. class InfixToPostfix extends StandardTokenParsers { import lexical._ def...

Is the DStream return by updateStateByKey function only contains one RDD?


apache-spark,spark-streaming,apache-spark-sql,pyspark
Is the DStream return by updateStateByKey function only contains one RDD? If not,Under what circumstances will the DStream contains more than one RDD?

Retrieving TriangleCount


scala,apache-spark,spark-graphx
I'm trying to retrieve the amount of triangles from a graph using graphX. As I'm new to both Scala and graphX, I'm currently quite stuck. I'm creating a graph from an edgefile: 1 2 1 3 2 3 This should be 1 triangle. Next I'm using the build in function...

SCALA: change the separator in Array


arrays,string,scala,delimiter
I have an Array like this. scala> var x=Array("a","x,y","b") x: Array[String] = Array(a, x,y, b) How do I change the separator comma in array to a :. And finally convert it to string like this. String = "a:x,y:b" My aim is to change the comma(separators only) to other separator(say,:), so...

Spray route get response from child actor


scala,akka,spray
I am trying to figure out how I can setup a Master Actor that calls the appropriate children, in support of some spray routes where I am trying to emulate db calls. I am new to akka / spray, so just trying to gain a better understanding of how you...

Like clause not working with int column in slick


scala,slick,slick-2.0
Slick doesn't seem to support like clauses on int columns. The following code where Status.code is of int type doesn't seem to work. Would there be a workaround for this? val query = for { s <- Status if s.code like "1%" } yield (s) ...

Convert RDD[Map[String,Double]] to RDD[(String,Double)]


scala,apache-spark,rdd
I did some calculation and returned my values in a RDD containing scala map and now I want to remove this map and want to collect all keys values in a RDD. Any help will be appreciated....

Type to impose required constrains on a double


scala,implicit-conversion
I would like to have a run time check on a Double, without having to scatter the check all over my code. I thought that defining an implicit class would do the job, something on the line: implicit class Probability(val x: Double) { require(x >= 0.0 && x <= 1.0,...

PlayFramework: value as is not a member of Array[Byte]


scala,playframework
I want to make file download from a database using Play framework. But when I use this code I get this message: value as is not a member of Array[Byte] And if I change Ok(bytOfImage.as("image/jpg")) to Ok(bytOfImage) it works good but I get a file with a name: secondindex without...

Scala first program issue


scala,recursion,case,frequency
I have just started to learn Scala after some experience with functional programming in other languages. def freq(c:Char, y:String, list:List[(Char,Int)]): List[(Char,Int)] = list match{ case _ => freq(c, y.filter(_ == c), list :: List((count(c,y),c))) case nil => list } In the above code I am getting an error when trying...

Scala rep separator for specific area of text


scala,parser-combinators
Imaging i've got following: --open Client: enter Nick Age 28 Rosewell, USA Client: enter Maria Age 19 Cleveland, USA --open-- I need a result close to the following: List(List(Nick, Age 28, Rosewell), List(Maria, Age19, Cleveland)) It can be as many clients inside open body as you can imagine, so the...

Scala unapplySeq extractor syntax


scala,pattern-matching,scala-2.11
I (inadvertently) came across a bit of pattern matching syntax I did not expect to compile and now cannot figure out. It appears related to unapplySeq. Note the case x List(_,_) part in this simple example: val xs = List(1, 2, 3) //> xs : List[Int] = List(1, 2, 3)...

Join files using Apache Spark / Spark SQL


java,apache-spark,apache-spark-sql
I am trying to use Apache Spark for comparing two different files based on some common field, and get the values from both files and write it as output file. I am using Spark SQL for joining both files (after storing the RDD as table). Is this the correct approach?...

Implementing map on a tree using fold


scala,haskell
I am trying to implement a map using fold. I could do so in Haskell data Tree a = EmptyTree | Node a (Tree a) (Tree a) deriving (Show) foldTree :: Tree a -> b -> (b -> a -> b -> b) -> b foldTree EmptyTree d _ =...

How to instantiate lexical.Scanner in a JavaTokenParsers class?


scala,parsing,lexical-scanner
I am writing a parser which inherits from JavaTokenParsers in that I have a function as follow: import scala.util.parsing.combinator.lexical._ import scala.util.parsing._ import scala.util.parsing.combinator.RegexParsers; import scala.util.parsing.combinator.syntactical.StdTokenParsers import scala.util.parsing.combinator.token.StdTokens import scala.util.parsing.combinator.lexical.StdLexical import scala.util.parsing.combinator.lexical.Scanners import scala.util.parsing.combinator.lexical.Lexical import...

Scala running issue on eclipse


eclipse,scala
I configured everthing within eclipse for scala. I create a snippet to show you the issue, i can't see in run options run as scala application, i also tried to find my main class under build configuration option but i can't find it. How i can solve it?...

refer to scala function by name?


scala
Here is another stupid scala question regarding functions as first class objects in Scala. I'm very sorry if this is a repeat, as it probably is. In Python, Lisp, Perl, Scheme, etcetera I'm used to creating function values and assigning them names and passing them around to other functions, like...

How to effectively get indices of 1s for given binary string using Scala?


scala,functional-programming,higher-order-functions
Suppose we have a binary string such as 10010010. All I want is a function returning indices of 1s for that string: indicesOfOnes("10010010") -> List(0, 3, 6) indicesOfOnes("0") -> List() And what I implemented is: def indicesOfOnes(bs: String): List[Int] = { val lb = ListBuffer[Int]() bs.zipWithIndex.foreach { case (v, i)...

Preventing a class instantiation in Scala using Factory Pattern [duplicate]


scala,factory-pattern
This question already has an answer here: How to check constructor arguments and throw an exception or make an assertion in a default constructor in Scala? 2 answers Suppose that I have the following class defined in Scala: class ClassA(val n: Int) { ... } I want to limit...

Is this definition of a tail recursive fibonacci function tail-recursive?


scala,f#,functional-programming,tail-recursion,continuation-passing
I've seen around the following F# definition of a continuation-passing-style fibonacci function, that I always assumed to be tail recursive: let fib k = let rec fib' k cont = match k with | 0 | 1 -> cont 1 | k -> fib' (k-1) (fun a -> fib' (k-2)...

implicit resolution for a function argument


scala,implicit,context-bound
I tried to implement mergesort in Scala. I got to the following: def mergeSort[A: Ordering](as: List[A]): List[A] = as match { case Nil => as case head :: Nil => as case _ => { val (l, r) = split(as) merge(mergeSort(l), mergeSort(r)) } } def split[A](as: List[A]): (List[A], List[A]) =...

How to generalize the round methods


scala
I have the following four methods, using BigDecimal to round a number: private def round(input: Byte, scale: Int): Byte = { BigDecimal(input).setScale(scale, RoundingMode.HALF_UP).byteValue() } private def round(input: Short, scale: Int): Short = { BigDecimal(input).setScale(scale, RoundingMode.HALF_UP).shortValue() } private def round(input: Int, scale: Int): Int = { BigDecimal(input).setScale(scale, RoundingMode.HALF_UP).intValue() } private def...

Zipping two arrays together with index in Scala?


arrays,scala,zip
I have two arrays populated with integers. They are the same size (val array1 and val array2). I want to fuse them together into tuples with their index as the third element. For example if we have val array1 = Array(5,2,6,2) and val array2 = Array(9,8,3,4) then I want to...

Scala string replacement of entire words that comply with a pattern


string,scala,scala-collections,scala-string
In a string, how to replace words that start with a given pattern ? For instance replace each word that starts with "th", with "123", val in = "this is the example, that we think of" val out = "123 is 123 example, 123 we 123 of" Namely how to...

Implicit Generic.Aux missing on conversion from Shapeless HList to case class


scala,shapeless,type-level-computation
I just recently started learning scala and today I decided I wanted to write a CSV parser that would load nicely into case classes but store the data in rows (lists) of Shapeless's HList object so that I could get some exposure to type-level programming. Here's what I have so...

How do I flatMap a row of arrays into multiple rows?


apache-spark,apache-spark-sql
After parsing some jsons I have a one-column DataFrame of arrays scala> val jj =sqlContext.jsonFile("/home/aahu/jj2.json") res68: org.apache.spark.sql.DataFrame = [r: array<bigint>] scala> jj.first() res69: org.apache.spark.sql.Row = [List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)] I'd like to explode each row out into several rows. How? edit: Original json file:...

Collapse similar case statements in Scala


scala,functional-programming,pattern-matching
Is there an elegant way to do something like the following example using just one case statement? foobar match { case Node(Leaf(key, value), parent, qux) => { // Do something with parent & qux } case Node(parent, Leaf(key, value), qux) => { // Do something with parent & qux (code...

Scala (Slick) HList splitting to case classes


scala,slick
currently I have a HList with more than 22 fields and now I want to split it to 2-3 case classes, is there an easy functional way to do it? Currently I use the following syntax: CaseClass1(c.head, c.tail.head, c.tail.tail.head, etc...) However that doesn't seem to be right since I have...

Spray microservice assembly deduplicate


scala,sbt,akka,spray,microservices
I'm using this template to develop a microservice: http://www.typesafe.com/activator/template/activator-service-container-tutorial My sbt file is like this: import sbt._ import Keys._ name := "activator-service-container-tutorial" version := "1.0.1" scalaVersion := "2.11.6" crossScalaVersions := Seq("2.10.5", "2.11.6") resolvers += "Scalaz Bintray Repo" at "https://dl.bintray.com/scalaz/releases" libraryDependencies ++= { val containerVersion = "1.0.1" val configVersion = "1.2.1"...

Scala - Option Type Var Manipulation


scala,scala-option
I am working on an online exercise practicing Options and threads, both of which I have very little experience. The online exercise comes with a test suite, so right now I am trying to get my Option test cases to pass before I move on to the thread test cases....

Access key from mapValues or flatMapValues?


scala,apache-spark
In Spark 1.3, is there a way to access the key from mapValues? Specifically, if I have val y = x.groupBy(someKey) val z = y.mapValues(someFun) can someFun know which key of y it is currently operating on? Or do I have to do val y = x.map(r => (someKey(r), r)).groupBy(_._1)...

ZipList with Scalaz


list,scala,scalaz,applicative
Suppose I have a list of numbers and list of functions to apply to numbers: val xs: List[Int] = List(1, 2, 3) val fs: List[Int => Int] = List(f1, f2, f3) Now I would like to use an Applicative to apply f1 to 1, f2 to 2, etc. val ys:...

Future yielding with flatMap


scala
Given Futures fa, fb, fc, I can use f: Function1[(A,B,C), Future[D]], to return a Future[D] either by: (for { a <- fa b <- fb c <- fc } yield (a,b,c)).flatMap(f) which has the unenviable property of declaring the variables a,b,c twice. or a.zip(b).zip(c).flatMap{ case (a, (b, c)) => f(a,...

Operand order in Scala List.prepend (::)


list,scala,operators
Odersky has brilliantly optimized Java syntax, enabling object calls without dots and parenthesis. I.e. instead of list.prepend(item), you now simply write list :: item, which also turns language operators into simple object methods. Here, List defines :: (prepend) operator. However, you normally write it vice-verse in Scala, using item ::...

Error when running job that queries against Cassandra via Spark SQL through Spark Jobserver


cassandra,apache-spark,apache-spark-sql,spark-jobserver,spark-cassandra-connector
So I'm trying to run job that simply runs a query against cassandra using spark-sql, the job is submitted fine and the job starts fine. This code works when it is not being run through spark jobserver (when simply using spark submit). Could someone tell my what is wrong with...

Include package in Spark local mode


python,apache-spark,py.test,pyspark
I'm writing some unit tests for my Spark code in python. My code depends on spark-csv. In production I use spark-submit --packages com.databricks:spark-csv_2.10:1.0.3 to submit my python script. I'm using pytest to run my tests with Spark in local mode: conf = SparkConf().setAppName('myapp').setMaster('local[1]') sc = SparkContext(conf=conf) My question is, since...

Scodec: Coproducts could not find implicit value for parameter auto: scodec.codecs.CoproductBuilderAuto


scala,scodec
I am trying to define an Scodec coproduct codec for communicating with an EELink GPS. Here is the code: import scodec.Codec import scodec.bits.ByteVector import scodec.codecs._ trait Message object Message { implicit val discriminated: Discriminated[ Message, Int ] = Discriminated(uint8) val codec: Codec[ Message ] = Codec.coproduct[ Message ].discriminatedByIndex(uint8) } case...

Pyspark: using filter for feature selection


python,apache-spark,pyspark
I have an array of dimensions 500 x 26. Using the filter operation in pyspark, I'd like to pick out the columns which are listed in another array at row i. Ex: if a[i]= [1 2 3] Then pick out columns 1, 2 and 3 and all rows. Can this...

Passing a function foreach key of an Array


scala,apache-spark,scala-collections,spark-graphx
I have an array like that : val pairs: Array[(Int, ((VertexId, Seq[Int]), Int))] which generates this output : (11,((11,ArraySeq(2, 5, 4, 5)),1)) (11,((12,ArraySeq(7, 7, 8, 2)),1)) (11,((13,ArraySeq(5, 9, 8, 7)),1)) (1,((1,ArraySeq(1, 2, 3, 4)),1)) (1,((4,ArraySeq(1, 5, 1, 1)),1)) I want to build a Graph for each pairs._1. That means for...

Scala slf4j dynamic file name


scala,logging,slf4j
I just successfully added Grizzled-SLF4J logger to my project using this link http://alvinalexander.com/scala/how-to-log-output-file-grizzled-slf4j-scala-simplelogger.properties But using this properties, there is no option to create dynamic file name: org.slf4j.simpleLogger.logFile = /tmp/myapp.log org.slf4j.simpleLogger.defaultLogLevel = info org.slf4j.simpleLogger.showDateTime = true org.slf4j.simpleLogger.dateTimeFormat = yyyy'/'MM'/'dd' 'HH':'mm':'ss'-'S org.slf4j.simpleLogger.showThreadName = true...

How to extract application ID from the PySpark context


apache-spark,yarn,pyspark
A previous question recommends sc.applicationId, but it is not present in PySpark, only in scala. So, how do I figure out the application id (for yarn) of my PySpark process?...