multithreading,scala,web-crawler,akka,actor , Workload balancing between akka actors


Workload balancing between akka actors

Question:

Tag: multithreading,scala,web-crawler,akka,actor

I have 2 akka actors used for crawling links, i.e. find all links in page X, then find all links in all pages linked from X, etc...

I want them to progress more or less at the same pace, but more often than not one of them becomes starved and another one consumes all resources.

I've tried following approaches (simplified). Single page crawling is done by the following actor:

class Crawler extends Actor {
  def receive = {
    case Crawl(url, kind) =>
      // download url
      // extract links
      sender ! Parsed(url, links, kind)
  }
}

Approach 1:

class Coordinator extends Actor {
  val linksA = ...
  val linksB = ...
  def receive = {
    case Parsed(url, links, kind) =>
      val store = if (kind == kindA) linksA else linksB
      val newLinks = links -- store
      store ++= links
      newLinks.foreach { link =>
        val crawler = context.actorOf(Props[Crawler])
        crawler ! Crawl(link, kind)
      }
  }
}

Approach 2:

class Coordinator extends Actor {
  val linksA = ...
  val linksB = ...
  val rrProps = Props[Crawler].withRouter(RoundRobinRouter(nrOfInstances = 10)
  val crawlerA = context.actorOf(rrProps)
  val crawlerB = context.actorOf(rrProps)
  def receive = {
    case Parsed(url, links, kind) =>
      val store = if (kind == kindA) linksA else linksB
      val newLinks = links -- store
      store ++= links
      newLinks.foreach { link =>
        if (kind == kindA) crawlerA ! Crawl(link, kind)
        else crawlerB ! Crawl(link, kind)
      }
  }
}

Second approach made things slightly better, but didn't fix it whole.

Is there a good way to make crawlers of both kinds progress more or less at the same pace? Should I send messages between them unblocking each other in turn?


Answer:

I'm working on a similar program where the workers have a non-uniform resource cost (in my case the task is performing database queries and dumping the results in another database, but just as crawling different websites will have different costs so too will different queries have different costs). Two ways of dealing with this that I've employed:

  1. Replace the RoundRobinRouter with a SmallestMailboxRouter
  2. Don't have the Coordinator send out all of its messages at once - instead send them out in batches, in your case you have ten workers so sending out forty messages should keep them busy initially. Whenever a worker completes a task it sends a message to the Coordinator, at which point the Coordinator sends out another message that will probably go to the worker that just completed its task. (You can also do this in batches, i.e. after receiving n "task complete" messages the Coordinator sends out another n messages, but don't make n too high or else some workers with extremely short tasks may be idle.)

A third option is to cheat and share a ConcurrentLinkedQueue between all actors: after filling the queue the Coordinator sends a "start" message to the workers, and the workers then poll the queue til it's empty.


Related:


java multithreading start() and run() [duplicate]


java,multithreading
This question already has an answer here: Java: What's the difference between Thread start() and Runnable run() 10 answers Below is my Multithreading class: public class Multithreading extends Thread{ public void run(){ for(int i=1;i<5;i++){ try{ Thread.sleep(500); }catch(InterruptedException e){ System.out.println(e); } System.out.println(i); } } public static void main(String args[]) {...

Spray microservice assembly deduplicate


scala,sbt,akka,spray,microservices
I'm using this template to develop a microservice: http://www.typesafe.com/activator/template/activator-service-container-tutorial My sbt file is like this: import sbt._ import Keys._ name := "activator-service-container-tutorial" version := "1.0.1" scalaVersion := "2.11.6" crossScalaVersions := Seq("2.10.5", "2.11.6") resolvers += "Scalaz Bintray Repo" at "https://dl.bintray.com/scalaz/releases" libraryDependencies ++= { val containerVersion = "1.0.1" val configVersion = "1.2.1"...

Scala rep separator for specific area of text


scala,parser-combinators
Imaging i've got following: --open Client: enter Nick Age 28 Rosewell, USA Client: enter Maria Age 19 Cleveland, USA --open-- I need a result close to the following: List(List(Nick, Age 28, Rosewell), List(Maria, Age19, Cleveland)) It can be as many clients inside open body as you can imagine, so the...

PlayFramework: value as is not a member of Array[Byte]


scala,playframework
I want to make file download from a database using Play framework. But when I use this code I get this message: value as is not a member of Array[Byte] And if I change Ok(bytOfImage.as("image/jpg")) to Ok(bytOfImage) it works good but I get a file with a name: secondindex without...

Does wait() need synchronization on local variable


java,multithreading,synchronization
I had this code (which was working fine): public static void runOnUiThread(Activity c, final Runnable action) { // Check if we are on the UI Thread if (Looper.getMainLooper() == Looper.myLooper()) { // If we are, execute immediately action.run(); return; } // Else run the runnable on the UI Thread and...

performance issues executing list of stored procedures


c#,multithreading,performance,loops
I'm having some performance issues when starting my windows service, the first round my lstSps is long (about 130 stored procedures). Is there anyway to speed this up (except for speeding the stored procedures up)? When the foreach is over and goes over to the second round it goes faster,...

Convert RDD[Map[String,Double]] to RDD[(String,Double)]


scala,apache-spark,rdd
I did some calculation and returned my values in a RDD containing scala map and now I want to remove this map and want to collect all keys values in a RDD. Any help will be appreciated....

Scala - Option Type Var Manipulation


scala,scala-option
I am working on an online exercise practicing Options and threads, both of which I have very little experience. The online exercise comes with a test suite, so right now I am trying to get my Option test cases to pass before I move on to the thread test cases....

Is there standard implementation for thread block/resume in java SE?


java,multithreading,wait
I need to block execution of a thread until resumed from another thread. So I wrote my own implementation using wait() method. Which seems to be working, but it is far from simple. Is there any ready to use solution? Preferably in java SE 6? Or do I have to...

My Scala program won't print anything


scala
Basically I'm trying to write a program that has a list of books and authors and then prints 2 things out, res1, which should print the book titles of the authors that have the name "Andrei" in it and res2 that should print the book titles that have the string...

Scodec: Coproducts could not find implicit value for parameter auto: scodec.codecs.CoproductBuilderAuto


scala,scodec
I am trying to define an Scodec coproduct codec for communicating with an EELink GPS. Here is the code: import scodec.Codec import scodec.bits.ByteVector import scodec.codecs._ trait Message object Message { implicit val discriminated: Discriminated[ Message, Int ] = Discriminated(uint8) val codec: Codec[ Message ] = Codec.coproduct[ Message ].discriminatedByIndex(uint8) } case...

Retrieving TriangleCount


scala,apache-spark,spark-graphx
I'm trying to retrieve the amount of triangles from a graph using graphX. As I'm new to both Scala and graphX, I'm currently quite stuck. I'm creating a graph from an edgefile: 1 2 1 3 2 3 This should be 1 triangle. Next I'm using the build in function...

Scala string replacement of entire words that comply with a pattern


string,scala,scala-collections,scala-string
In a string, how to replace words that start with a given pattern ? For instance replace each word that starts with "th", with "123", val in = "this is the example, that we think of" val out = "123 is 123 example, 123 we 123 of" Namely how to...

Set Label From Thread


vb.net,multithreading,winforms
Form1.vb Imports System.Threading Public Class Form1 Dim demoThread As Thread Private Sub Button1_Click(sender As Object, e As EventArgs) Handles Button1.Click Dim Start As New Class1 Me.demoThread = New Thread( _ New ThreadStart(AddressOf Start.ThreadProcSafe)) Me.demoThread.Start() End Sub Delegate Sub SetTextCallback([text] As String) Public Sub SetText(ByVal [text] As String) ' InvokeRequired required...

Operand order in Scala List.prepend (::)


list,scala,operators
Odersky has brilliantly optimized Java syntax, enabling object calls without dots and parenthesis. I.e. instead of list.prepend(item), you now simply write list :: item, which also turns language operators into simple object methods. Here, List defines :: (prepend) operator. However, you normally write it vice-verse in Scala, using item ::...

Scala (Slick) HList splitting to case classes


scala,slick
currently I have a HList with more than 22 fields and now I want to split it to 2-3 case classes, is there an easy functional way to do it? Currently I use the following syntax: CaseClass1(c.head, c.tail.head, c.tail.tail.head, etc...) However that doesn't seem to be right since I have...

Images not appearing on WPF form when loading asynchronously


c#,wpf,multithreading,listbox,backgroundworker
I'm attempting to display (in a ListBox with a custom DataTemplate) a series of BitmapSource frames (thumbnails) extracted from a multi-page tiff image. When I process the tiff on the UI thread, and either directly add the images to a listbox's item collection or add them to a bound ObservableCollection,...

How to use the Akka ask pattern without blocking


scala,asynchronous,akka,future
Hi I have a actor which is responsible for fetching data from a database, turning it into a list and sending it back to the sender. I am using the ask pattern to receive response from my actor, because I don't want to use await.result because this approach will block...

Scala unapplySeq extractor syntax


scala,pattern-matching,scala-2.11
I (inadvertently) came across a bit of pattern matching syntax I did not expect to compile and now cannot figure out. It appears related to unapplySeq. Note the case x List(_,_) part in this simple example: val xs = List(1, 2, 3) //> xs : List[Int] = List(1, 2, 3)...

How does the kernel separate threads from processes


linux,multithreading,linux-kernel
Suppose I have a browser process like Firefox, that has pid = 123. Firefox has 5 opened tabs each running in a separate thread, so in total it has 5 threads. So I want to know in depth, how the kernel will separate the process into the thread to execute...

How to define a Regex in StandardTokenParsers to identify path?


regex,scala,parsing,lexical-analysis
I am writing a parser in which I want to parse arithmetic expressions like: /hdfs://xxx.xx.xx.x:xxxx/path1/file1.jpg+1 I want to parse it change the infix to postfix and do the calculation. I used helps from a part of code in another discussion as well. class InfixToPostfix extends StandardTokenParsers { import lexical._ def...

Future yielding with flatMap


scala
Given Futures fa, fb, fc, I can use f: Function1[(A,B,C), Future[D]], to return a Future[D] either by: (for { a <- fa b <- fb c <- fc } yield (a,b,c)).flatMap(f) which has the unenviable property of declaring the variables a,b,c twice. or a.zip(b).zip(c).flatMap{ case (a, (b, c)) => f(a,...

Play Framework Form Error Handling


scala,playframework,playframework-2.3,playframework-2.4
This is my view file containing the form that has to filled in by the user: @helper.form(call) { @helper.input(resumeForm("surname"), '_label -> "Surname") { (id, name, value, args) => <input name="@name" type="text" value="@value" placeholder="Enter your surname"> } } This is my custom field constructor: @(elements: helper.FieldElements) @if(!elements.args.isDefinedAt('showLabel) || elements.args('showLabel) == true)...

What happens if all node.js's worker threads are busy


javascript,node.js,multithreading
I try to understand how node.js works and although I have read this article: When is the thread pool used? I am not sure what happens if all worker threads are busy and another async I/O operation is ready to be executed. If I got this http://www.future-processing.pl/blog/on-problems-with-threads-in-node-js/ article right, the...

Scala slf4j dynamic file name


scala,logging,slf4j
I just successfully added Grizzled-SLF4J logger to my project using this link http://alvinalexander.com/scala/how-to-log-output-file-grizzled-slf4j-scala-simplelogger.properties But using this properties, there is no option to create dynamic file name: org.slf4j.simpleLogger.logFile = /tmp/myapp.log org.slf4j.simpleLogger.defaultLogLevel = info org.slf4j.simpleLogger.showDateTime = true org.slf4j.simpleLogger.dateTimeFormat = yyyy'/'MM'/'dd' 'HH':'mm':'ss'-'S org.slf4j.simpleLogger.showThreadName = true...

Calling dispatch_sync from a concurrent queue - does it block entirely?


ios,objective-c,multithreading,swift,grand-central-dispatch
Let's say I hypothetically call a dispatch_sync from a concurrent queue - does it block the entire queue or just that thread of execution?

Is this definition of a tail recursive fibonacci function tail-recursive?


scala,f#,functional-programming,tail-recursion,continuation-passing
I've seen around the following F# definition of a continuation-passing-style fibonacci function, that I always assumed to be tail recursive: let fib k = let rec fib' k cont = match k with | 0 | 1 -> cont 1 | k -> fib' (k-1) (fun a -> fib' (k-2)...

Scala running issue on eclipse


eclipse,scala
I configured everthing within eclipse for scala. I create a snippet to show you the issue, i can't see in run options run as scala application, i also tried to find my main class under build configuration option but i can't find it. How i can solve it?...

Java how to limit number of threads acting on method


java,multithreading,memory-management
I have java method in my web application doing heavy file operation. The thing is, if more than 5 threads come simultaneously (which will come in testing phase) it breaks down. I mean it cannot handle heavy traffic. That's why I want to handle maximum 5 requests at a time...

How to generalize the round methods


scala
I have the following four methods, using BigDecimal to round a number: private def round(input: Byte, scale: Int): Byte = { BigDecimal(input).setScale(scale, RoundingMode.HALF_UP).byteValue() } private def round(input: Short, scale: Int): Short = { BigDecimal(input).setScale(scale, RoundingMode.HALF_UP).shortValue() } private def round(input: Int, scale: Int): Int = { BigDecimal(input).setScale(scale, RoundingMode.HALF_UP).intValue() } private def...

Passing a function foreach key of an Array


scala,apache-spark,scala-collections,spark-graphx
I have an array like that : val pairs: Array[(Int, ((VertexId, Seq[Int]), Int))] which generates this output : (11,((11,ArraySeq(2, 5, 4, 5)),1)) (11,((12,ArraySeq(7, 7, 8, 2)),1)) (11,((13,ArraySeq(5, 9, 8, 7)),1)) (1,((1,ArraySeq(1, 2, 3, 4)),1)) (1,((4,ArraySeq(1, 5, 1, 1)),1)) I want to build a Graph for each pairs._1. That means for...

Implicit Generic.Aux missing on conversion from Shapeless HList to case class


scala,shapeless,type-level-computation
I just recently started learning scala and today I decided I wanted to write a CSV parser that would load nicely into case classes but store the data in rows (lists) of Shapeless's HList object so that I could get some exposure to type-level programming. Here's what I have so...

Preventing a class instantiation in Scala using Factory Pattern [duplicate]


scala,factory-pattern
This question already has an answer here: How to check constructor arguments and throw an exception or make an assertion in a default constructor in Scala? 2 answers Suppose that I have the following class defined in Scala: class ClassA(val n: Int) { ... } I want to limit...

Zipping two arrays together with index in Scala?


arrays,scala,zip
I have two arrays populated with integers. They are the same size (val array1 and val array2). I want to fuse them together into tuples with their index as the third element. For example if we have val array1 = Array(5,2,6,2) and val array2 = Array(9,8,3,4) then I want to...

Like clause not working with int column in slick


scala,slick,slick-2.0
Slick doesn't seem to support like clauses on int columns. The following code where Status.code is of int type doesn't seem to work. Would there be a workaround for this? val query = for { s <- Status if s.code like "1%" } yield (s) ...

Access key from mapValues or flatMapValues?


scala,apache-spark
In Spark 1.3, is there a way to access the key from mapValues? Specifically, if I have val y = x.groupBy(someKey) val z = y.mapValues(someFun) can someFun know which key of y it is currently operating on? Or do I have to do val y = x.map(r => (someKey(r), r)).groupBy(_._1)...

wait for an event regulary


multithreading,events
In a program I need to wait for an event (keypress) and get it's char. after that program will continue. this progress will Repeat several time. in my first try, codes run and any character did not save. I find out I should use threading but I am not Familiar...

How to instantiate lexical.Scanner in a JavaTokenParsers class?


scala,parsing,lexical-scanner
I am writing a parser which inherits from JavaTokenParsers in that I have a function as follow: import scala.util.parsing.combinator.lexical._ import scala.util.parsing._ import scala.util.parsing.combinator.RegexParsers; import scala.util.parsing.combinator.syntactical.StdTokenParsers import scala.util.parsing.combinator.token.StdTokens import scala.util.parsing.combinator.lexical.StdLexical import scala.util.parsing.combinator.lexical.Scanners import scala.util.parsing.combinator.lexical.Lexical import...

Multiple Threads searching on same folder at same time


c#,multithreading,file-search
Currently I have a .txt file of about 170,000 jpg file names and I read them all into a List (fileNames). I want to search ONE folder (this folder has sub-folders) to check if each file in fileNames exists in this folder and if it does, copy it to a...

Implementing map on a tree using fold


scala,haskell
I am trying to implement a map using fold. I could do so in Haskell data Tree a = EmptyTree | Node a (Tree a) (Tree a) deriving (Show) foldTree :: Tree a -> b -> (b -> a -> b -> b) -> b foldTree EmptyTree d _ =...

Collapse similar case statements in Scala


scala,functional-programming,pattern-matching
Is there an elegant way to do something like the following example using just one case statement? foobar match { case Node(Leaf(key, value), parent, qux) => { // Do something with parent & qux } case Node(parent, Leaf(key, value), qux) => { // Do something with parent & qux (code...

ZipList with Scalaz


list,scala,scalaz,applicative
Suppose I have a list of numbers and list of functions to apply to numbers: val xs: List[Int] = List(1, 2, 3) val fs: List[Int => Int] = List(f1, f2, f3) Now I would like to use an Applicative to apply f1 to 1, f2 to 2, etc. val ys:...

Web API - Set each thread with the HttpRequestMessage id?


c#,.net,multithreading,task-parallel-library,web-api
I have a web api coded in c#. The web api uses functionality which is shared with other in-house components. it depends on single threaded flows and uses thread local storage to store objects, and session information. Please don't say if it's good or bad, that's what I have to...

Looking to pause a thread using thread.sleep


java,multithreading
I am using a mouse listener for mouse pressed and released. When the mouse is pressed I want to have a counter incrementing a variable, and when the mouse is released I want to decrement that variable. Right now, my code is working and does that but the increment is...

std::condition_variable – notify once but wait thread wakened twice


c++,multithreading
Here's a simple C++ thread pool implementation. It's an altered version orginated from https://github.com/progschj/ThreadPool. #ifndef __THREAD_POOL_H__ #define __THREAD_POOL_H__ #include <vector> #include <queue> #include <memory> #include <thread> #include <chrono> #include <mutex> #include <condition_variable> #include <future> #include <functional> #include <stdexcept> namespace ThreadPool { class FixedThreadPool { public: FixedThreadPool(size_t); template<class F, class......

SCALA: change the separator in Array


arrays,string,scala,delimiter
I have an Array like this. scala> var x=Array("a","x,y","b") x: Array[String] = Array(a, x,y, b) How do I change the separator comma in array to a :. And finally convert it to string like this. String = "a:x,y:b" My aim is to change the comma(separators only) to other separator(say,:), so...

How to effectively get indices of 1s for given binary string using Scala?


scala,functional-programming,higher-order-functions
Suppose we have a binary string such as 10010010. All I want is a function returning indices of 1s for that string: indicesOfOnes("10010010") -> List(0, 3, 6) indicesOfOnes("0") -> List() And what I implemented is: def indicesOfOnes(bs: String): List[Int] = { val lb = ListBuffer[Int]() bs.zipWithIndex.foreach { case (v, i)...

How to unmarshall akka http request entity as string?


json,scala,akka-http
I'm trying to unmarshall request payload as string, but for some reason it's failing. My code: path("mypath") { post { decodeRequest { entity(as[String]) {jsonStr => //could not find implicit value for...FromRequestUnmarshaller[String] complete { val json: JsObject = Json.parse(jsonStr).as[JsObject] val jsObjectFuture: Future[JsObject] = MyDatabase.addListItem(json) jsObjectFuture.map(_.as[String]) } } } } } In...

How can I know the lock information in java?


java,multithreading,locking
Is there any tool or way that can get all the information about the locks in java? for example, if there is a java program, it creates two threads, and both threads require locks for some variable. Is there any tools that can output the information like which thread locks...

Scala first program issue


scala,recursion,case,frequency
I have just started to learn Scala after some experience with functional programming in other languages. def freq(c:Char, y:String, list:List[(Char,Int)]): List[(Char,Int)] = list match{ case _ => freq(c, y.filter(_ == c), list :: List((count(c,y),c))) case nil => list } In the above code I am getting an error when trying...