scala,apache-spark,scala-collections , Spark RDD equivalent to Scala collections partition


Spark RDD equivalent to Scala collections partition

Question:

Tag: scala,apache-spark,scala-collections

This is a minor issue with one of my spark jobs which doesn't seem to cause any issues -- yet annoys me every time I see it and fail to come up with a better solution.

Say I have a Scala collection like this:

val myStuff = List(Try(2/2), Try(2/0))

I can partition this list into successes and failures with partition:

val (successes, failures) =  myStuff.partition(_.isSuccess)

Which is nice. The implementation of partition only traverses the source collection once to build the two new collections. However, using Spark, the best equivalent I have been able to devise is this:

val myStuff: RDD[Try[???]] = sourceRDD.map(someOperationThatMayFail)
val successes: RDD[???] = myStuff.collect { case Success(v) => v }
val failures: RDD[Throwable] = myStuff.collect { case Failure(ex) => ex }

Which aside from the difference of unpacking the Try (which is fine) also requires traversing the data twice. Which is annoying.

Is there any better Spark alternative that can split an RDD without multiple traversals? i.e. having a signature something like this where partition has the behaviour of Scala collections partition rather than RDD partition:

val (successes: RDD[Try[???]], failures: RDD[Try[???]]) = myStuff.partition(_.isSuccess)

For reference, I previously used something like the below to solve this. The potentially failing operation is de-serializing some data from a binary format, and the failures have become interesting enough that they need to be processed and saved as an RDD rather than something logged.

def someOperationThatMayFail(data: Array[Byte]): Option[MyDataType] = {
   try {
      Some(deserialize(data))
   } catch {
      case e: MyDesrializationError => {
         logger.error(e)
         None
      }
   }
}

Answer:

There might be other solutions, but here you go:

Setup:

import scala.util._
val myStuff = List(Try(2/2), Try(2/0))
val myStuffInSpark = sc.parallelize(myStuff)

Execution:

val myStuffInSparkPartitioned = myStuffInSpark.aggregate((List[Try[Int]](),List[Try[Int]]()))(
  (accum, curr)=>if(curr.isSuccess) (curr :: accum._1,accum._2) else (accum._1, curr :: accum._2), 
  (first, second)=> (first._1 ++ second._1,first._2 ++ second._2))

Let me know if you need an explanation


Related:


refer to scala function by name?


scala
Here is another stupid scala question regarding functions as first class objects in Scala. I'm very sorry if this is a repeat, as it probably is. In Python, Lisp, Perl, Scheme, etcetera I'm used to creating function values and assigning them names and passing them around to other functions, like...

Pyspark: using filter for feature selection


python,apache-spark,pyspark
I have an array of dimensions 500 x 26. Using the filter operation in pyspark, I'd like to pick out the columns which are listed in another array at row i. Ex: if a[i]= [1 2 3] Then pick out columns 1, 2 and 3 and all rows. Can this...

Scala first program issue


scala,recursion,case,frequency
I have just started to learn Scala after some experience with functional programming in other languages. def freq(c:Char, y:String, list:List[(Char,Int)]): List[(Char,Int)] = list match{ case _ => freq(c, y.filter(_ == c), list :: List((count(c,y),c))) case nil => list } In the above code I am getting an error when trying...

Passing a function foreach key of an Array


scala,apache-spark,scala-collections,spark-graphx
I have an array like that : val pairs: Array[(Int, ((VertexId, Seq[Int]), Int))] which generates this output : (11,((11,ArraySeq(2, 5, 4, 5)),1)) (11,((12,ArraySeq(7, 7, 8, 2)),1)) (11,((13,ArraySeq(5, 9, 8, 7)),1)) (1,((1,ArraySeq(1, 2, 3, 4)),1)) (1,((4,ArraySeq(1, 5, 1, 1)),1)) I want to build a Graph for each pairs._1. That means for...

My Scala program won't print anything


scala
Basically I'm trying to write a program that has a list of books and authors and then prints 2 things out, res1, which should print the book titles of the authors that have the name "Andrei" in it and res2 that should print the book titles that have the string...

Implementing map on a tree using fold


scala,haskell
I am trying to implement a map using fold. I could do so in Haskell data Tree a = EmptyTree | Node a (Tree a) (Tree a) deriving (Show) foldTree :: Tree a -> b -> (b -> a -> b -> b) -> b foldTree EmptyTree d _ =...

Scala slf4j dynamic file name


scala,logging,slf4j
I just successfully added Grizzled-SLF4J logger to my project using this link http://alvinalexander.com/scala/how-to-log-output-file-grizzled-slf4j-scala-simplelogger.properties But using this properties, there is no option to create dynamic file name: org.slf4j.simpleLogger.logFile = /tmp/myapp.log org.slf4j.simpleLogger.defaultLogLevel = info org.slf4j.simpleLogger.showDateTime = true org.slf4j.simpleLogger.dateTimeFormat = yyyy'/'MM'/'dd' 'HH':'mm':'ss'-'S org.slf4j.simpleLogger.showThreadName = true...

Future yielding with flatMap


scala
Given Futures fa, fb, fc, I can use f: Function1[(A,B,C), Future[D]], to return a Future[D] either by: (for { a <- fa b <- fb c <- fc } yield (a,b,c)).flatMap(f) which has the unenviable property of declaring the variables a,b,c twice. or a.zip(b).zip(c).flatMap{ case (a, (b, c)) => f(a,...

Preventing a class instantiation in Scala using Factory Pattern [duplicate]


scala,factory-pattern
This question already has an answer here: How to check constructor arguments and throw an exception or make an assertion in a default constructor in Scala? 2 answers Suppose that I have the following class defined in Scala: class ClassA(val n: Int) { ... } I want to limit...

Scodec: Coproducts could not find implicit value for parameter auto: scodec.codecs.CoproductBuilderAuto


scala,scodec
I am trying to define an Scodec coproduct codec for communicating with an EELink GPS. Here is the code: import scodec.Codec import scodec.bits.ByteVector import scodec.codecs._ trait Message object Message { implicit val discriminated: Discriminated[ Message, Int ] = Discriminated(uint8) val codec: Codec[ Message ] = Codec.coproduct[ Message ].discriminatedByIndex(uint8) } case...

Like clause not working with int column in slick


scala,slick,slick-2.0
Slick doesn't seem to support like clauses on int columns. The following code where Status.code is of int type doesn't seem to work. Would there be a workaround for this? val query = for { s <- Status if s.code like "1%" } yield (s) ...

Is the DStream return by updateStateByKey function only contains one RDD?


apache-spark,spark-streaming,apache-spark-sql,pyspark
Is the DStream return by updateStateByKey function only contains one RDD? If not,Under what circumstances will the DStream contains more than one RDD?

Spray route get response from child actor


scala,akka,spray
I am trying to figure out how I can setup a Master Actor that calls the appropriate children, in support of some spray routes where I am trying to emulate db calls. I am new to akka / spray, so just trying to gain a better understanding of how you...

spark-mllib: Error “reassignment to val” in source code


apache-spark,mllib
I'm using IDEA SBT project to test spark-mllib code. Here is build.sbt: name := "SparkTest" version := "1.0" scalaVersion := "2.11.6" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.2.0", "org.apache.spark" %% "spark-mllib" % "1.2.0" ) After all the import and compile work has done, I found some errors in lib...

Join files using Apache Spark / Spark SQL


java,apache-spark,apache-spark-sql
I am trying to use Apache Spark for comparing two different files based on some common field, and get the values from both files and write it as output file. I am using Spark SQL for joining both files (after storing the RDD as table). Is this the correct approach?...

How to define a Regex in StandardTokenParsers to identify path?


regex,scala,parsing,lexical-analysis
I am writing a parser in which I want to parse arithmetic expressions like: /hdfs://xxx.xx.xx.x:xxxx/path1/file1.jpg+1 I want to parse it change the infix to postfix and do the calculation. I used helps from a part of code in another discussion as well. class InfixToPostfix extends StandardTokenParsers { import lexical._ def...

Scala running issue on eclipse


eclipse,scala
I configured everthing within eclipse for scala. I create a snippet to show you the issue, i can't see in run options run as scala application, i also tried to find my main class under build configuration option but i can't find it. How i can solve it?...

How to extract application ID from the PySpark context


apache-spark,yarn,pyspark
A previous question recommends sc.applicationId, but it is not present in PySpark, only in scala. So, how do I figure out the application id (for yarn) of my PySpark process?...

Error when running job that queries against Cassandra via Spark SQL through Spark Jobserver


cassandra,apache-spark,apache-spark-sql,spark-jobserver,spark-cassandra-connector
So I'm trying to run job that simply runs a query against cassandra using spark-sql, the job is submitted fine and the job starts fine. This code works when it is not being run through spark jobserver (when simply using spark submit). Could someone tell my what is wrong with...

ZipList with Scalaz


list,scala,scalaz,applicative
Suppose I have a list of numbers and list of functions to apply to numbers: val xs: List[Int] = List(1, 2, 3) val fs: List[Int => Int] = List(f1, f2, f3) Now I would like to use an Applicative to apply f1 to 1, f2 to 2, etc. val ys:...

implicit resolution for a function argument


scala,implicit,context-bound
I tried to implement mergesort in Scala. I got to the following: def mergeSort[A: Ordering](as: List[A]): List[A] = as match { case Nil => as case head :: Nil => as case _ => { val (l, r) = split(as) merge(mergeSort(l), mergeSort(r)) } } def split[A](as: List[A]): (List[A], List[A]) =...

Scala - Option Type Var Manipulation


scala,scala-option
I am working on an online exercise practicing Options and threads, both of which I have very little experience. The online exercise comes with a test suite, so right now I am trying to get my Option test cases to pass before I move on to the thread test cases....

How to instantiate lexical.Scanner in a JavaTokenParsers class?


scala,parsing,lexical-scanner
I am writing a parser which inherits from JavaTokenParsers in that I have a function as follow: import scala.util.parsing.combinator.lexical._ import scala.util.parsing._ import scala.util.parsing.combinator.RegexParsers; import scala.util.parsing.combinator.syntactical.StdTokenParsers import scala.util.parsing.combinator.token.StdTokens import scala.util.parsing.combinator.lexical.StdLexical import scala.util.parsing.combinator.lexical.Scanners import scala.util.parsing.combinator.lexical.Lexical import...

Scala rep separator for specific area of text


scala,parser-combinators
Imaging i've got following: --open Client: enter Nick Age 28 Rosewell, USA Client: enter Maria Age 19 Cleveland, USA --open-- I need a result close to the following: List(List(Nick, Age 28, Rosewell), List(Maria, Age19, Cleveland)) It can be as many clients inside open body as you can imagine, so the...

Collapse similar case statements in Scala


scala,functional-programming,pattern-matching
Is there an elegant way to do something like the following example using just one case statement? foobar match { case Node(Leaf(key, value), parent, qux) => { // Do something with parent & qux } case Node(parent, Leaf(key, value), qux) => { // Do something with parent & qux (code...

Access key from mapValues or flatMapValues?


scala,apache-spark
In Spark 1.3, is there a way to access the key from mapValues? Specifically, if I have val y = x.groupBy(someKey) val z = y.mapValues(someFun) can someFun know which key of y it is currently operating on? Or do I have to do val y = x.map(r => (someKey(r), r)).groupBy(_._1)...

Scala string replacement of entire words that comply with a pattern


string,scala,scala-collections,scala-string
In a string, how to replace words that start with a given pattern ? For instance replace each word that starts with "th", with "123", val in = "this is the example, that we think of" val out = "123 is 123 example, 123 we 123 of" Namely how to...

Is this definition of a tail recursive fibonacci function tail-recursive?


scala,f#,functional-programming,tail-recursion,continuation-passing
I've seen around the following F# definition of a continuation-passing-style fibonacci function, that I always assumed to be tail recursive: let fib k = let rec fib' k cont = match k with | 0 | 1 -> cont 1 | k -> fib' (k-1) (fun a -> fib' (k-2)...

Implicit Generic.Aux missing on conversion from Shapeless HList to case class


scala,shapeless,type-level-computation
I just recently started learning scala and today I decided I wanted to write a CSV parser that would load nicely into case classes but store the data in rows (lists) of Shapeless's HList object so that I could get some exposure to type-level programming. Here's what I have so...

Convert RDD[Map[String,Double]] to RDD[(String,Double)]


scala,apache-spark,rdd
I did some calculation and returned my values in a RDD containing scala map and now I want to remove this map and want to collect all keys values in a RDD. Any help will be appreciated....

SCALA: change the separator in Array


arrays,string,scala,delimiter
I have an Array like this. scala> var x=Array("a","x,y","b") x: Array[String] = Array(a, x,y, b) How do I change the separator comma in array to a :. And finally convert it to string like this. String = "a:x,y:b" My aim is to change the comma(separators only) to other separator(say,:), so...

How to use the Akka ask pattern without blocking


scala,asynchronous,akka,future
Hi I have a actor which is responsible for fetching data from a database, turning it into a list and sending it back to the sender. I am using the ask pattern to receive response from my actor, because I don't want to use await.result because this approach will block...

How to effectively get indices of 1s for given binary string using Scala?


scala,functional-programming,higher-order-functions
Suppose we have a binary string such as 10010010. All I want is a function returning indices of 1s for that string: indicesOfOnes("10010010") -> List(0, 3, 6) indicesOfOnes("0") -> List() And what I implemented is: def indicesOfOnes(bs: String): List[Int] = { val lb = ListBuffer[Int]() bs.zipWithIndex.foreach { case (v, i)...

How do I flatMap a row of arrays into multiple rows?


apache-spark,apache-spark-sql
After parsing some jsons I have a one-column DataFrame of arrays scala> val jj =sqlContext.jsonFile("/home/aahu/jj2.json") res68: org.apache.spark.sql.DataFrame = [r: array<bigint>] scala> jj.first() res69: org.apache.spark.sql.Row = [List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)] I'd like to explode each row out into several rows. How? edit: Original json file:...

Play Framework Form Error Handling


scala,playframework,playframework-2.3,playframework-2.4
This is my view file containing the form that has to filled in by the user: @helper.form(call) { @helper.input(resumeForm("surname"), '_label -> "Surname") { (id, name, value, args) => <input name="@name" type="text" value="@value" placeholder="Enter your surname"> } } This is my custom field constructor: @(elements: helper.FieldElements) @if(!elements.args.isDefinedAt('showLabel) || elements.args('showLabel) == true)...

How to generalize the round methods


scala
I have the following four methods, using BigDecimal to round a number: private def round(input: Byte, scale: Int): Byte = { BigDecimal(input).setScale(scale, RoundingMode.HALF_UP).byteValue() } private def round(input: Short, scale: Int): Short = { BigDecimal(input).setScale(scale, RoundingMode.HALF_UP).shortValue() } private def round(input: Int, scale: Int): Int = { BigDecimal(input).setScale(scale, RoundingMode.HALF_UP).intValue() } private def...

Scala unapplySeq extractor syntax


scala,pattern-matching,scala-2.11
I (inadvertently) came across a bit of pattern matching syntax I did not expect to compile and now cannot figure out. It appears related to unapplySeq. Note the case x List(_,_) part in this simple example: val xs = List(1, 2, 3) //> xs : List[Int] = List(1, 2, 3)...

PlayFramework: value as is not a member of Array[Byte]


scala,playframework
I want to make file download from a database using Play framework. But when I use this code I get this message: value as is not a member of Array[Byte] And if I change Ok(bytOfImage.as("image/jpg")) to Ok(bytOfImage) it works good but I get a file with a name: secondindex without...

Type to impose required constrains on a double


scala,implicit-conversion
I would like to have a run time check on a Double, without having to scatter the check all over my code. I thought that defining an implicit class would do the job, something on the line: implicit class Probability(val x: Double) { require(x >= 0.0 && x <= 1.0,...

Scala (Slick) HList splitting to case classes


scala,slick
currently I have a HList with more than 22 fields and now I want to split it to 2-3 case classes, is there an easy functional way to do it? Currently I use the following syntax: CaseClass1(c.head, c.tail.head, c.tail.tail.head, etc...) However that doesn't seem to be right since I have...

Zipping two arrays together with index in Scala?


arrays,scala,zip
I have two arrays populated with integers. They are the same size (val array1 and val array2). I want to fuse them together into tuples with their index as the third element. For example if we have val array1 = Array(5,2,6,2) and val array2 = Array(9,8,3,4) then I want to...

Retrieving TriangleCount


scala,apache-spark,spark-graphx
I'm trying to retrieve the amount of triangles from a graph using graphX. As I'm new to both Scala and graphX, I'm currently quite stuck. I'm creating a graph from an edgefile: 1 2 1 3 2 3 This should be 1 triangle. Next I'm using the build in function...

Operand order in Scala List.prepend (::)


list,scala,operators
Odersky has brilliantly optimized Java syntax, enabling object calls without dots and parenthesis. I.e. instead of list.prepend(item), you now simply write list :: item, which also turns language operators into simple object methods. Here, List defines :: (prepend) operator. However, you normally write it vice-verse in Scala, using item ::...

Include package in Spark local mode


python,apache-spark,py.test,pyspark
I'm writing some unit tests for my Spark code in python. My code depends on spark-csv. In production I use spark-submit --packages com.databricks:spark-csv_2.10:1.0.3 to submit my python script. I'm using pytest to run my tests with Spark in local mode: conf = SparkConf().setAppName('myapp').setMaster('local[1]') sc = SparkContext(conf=conf) My question is, since...

Spray microservice assembly deduplicate


scala,sbt,akka,spray,microservices
I'm using this template to develop a microservice: http://www.typesafe.com/activator/template/activator-service-container-tutorial My sbt file is like this: import sbt._ import Keys._ name := "activator-service-container-tutorial" version := "1.0.1" scalaVersion := "2.11.6" crossScalaVersions := Seq("2.10.5", "2.11.6") resolvers += "Scalaz Bintray Repo" at "https://dl.bintray.com/scalaz/releases" libraryDependencies ++= { val containerVersion = "1.0.1" val configVersion = "1.2.1"...

How to unmarshall akka http request entity as string?


json,scala,akka-http
I'm trying to unmarshall request payload as string, but for some reason it's failing. My code: path("mypath") { post { decodeRequest { entity(as[String]) {jsonStr => //could not find implicit value for...FromRequestUnmarshaller[String] complete { val json: JsObject = Json.parse(jsonStr).as[JsObject] val jsObjectFuture: Future[JsObject] = MyDatabase.addListItem(json) jsObjectFuture.map(_.as[String]) } } } } } In...