cassandra,apache-spark , Cassandra inconsistencies with batch updates


Cassandra inconsistencies with batch updates

Question:

Tag: cassandra,apache-spark

I'm using Apache Spark with Spark Cassandra Connector to write millions of rows to a Cassandra cluster. Replication factor is set to 3 and I set the write consistency to ALL in spark-submit (YARN client mode) using the following options:

spark-submit ...
--conf spark.cassandra.output.consistency.level=ALL \
--conf spark.cassandra.output.concurrent.writes=1 \
--conf spark.cassandra.output.batch.size.bytes=20000 \
...

I then wrote another Spark job to count the data I have written. I set up consistency of the new Job as follow:

spark-submit ...
--conf spark.cassandra.input.consistency.level=ONE \
--conf spark.cassandra.input.split.size=50000 \
...

From the documentation, if the write consistency plus the read consistency is greater than the replication factor, I should have consistent reads.

But I'm getting the following results:

What am I missing ? Is there any secret configuration that is set up by default (e.g. in case of issues during write then decrease the consistency level, or something like that...) or am I using a buggy version of Cassandra (it's 2.1.2), or are there issues with batch updates that spark-cassandra-connector uses for saving data to Cassandra (I'm simply using the "saveToCassandra" method) ?

What's going wrong ?


Answer:

I confirm that this is a bug in connector. Consistency level is being set on individual prepared statements and is simply ignored in case when we use batch statements. Follow the updates on the connector - the fix is going to be included in the next bug-fix release.


Related:


Install SparkR that comes with Spark 1.4


r,apache-spark,sparkr
The newest version of Spark (1.4) now comes with SparkR. Does anyone know how to go about installing the SparkR implementation on Windows? The sparkR.R script is currently located in C:/spark-1.4.0/R/pkgs/R/ This appears to be a step in the right direction, but the instructions don't work for Windows as there...

how to parse a custom log file in scala to extract some key value pairs using patterns


java,regex,scala,apache-spark
I am building a spark streaming app that takes in logs coming out of a server. A log line looks something like this. 2015-06-18T13:53:46.606-0400 CustomLog v4 INFO: source="ABCD" type="type1" <xml some xml here attr1='value1' attr2='value2' > </xml> <some more xml></> time ="232" I am trying to follow the sample app...

Spark: use reduceByKey instead of groupByKey and mapByValues


python,apache-spark,pyspark
I have an RDD with duplicates values with the following format: [ {key1: A}, {key1: A}, {key1: B}, {key1: C}, {key2: B}, {key2: B}, {key2: D}, ..] I would like the new RDD to have the following output and to get ride of duplicates. [ {key1: [A,B,C]}, {key2: [B,D]}, ..]...

Include package in Spark local mode


python,apache-spark,py.test,pyspark
I'm writing some unit tests for my Spark code in python. My code depends on spark-csv. In production I use spark-submit --packages com.databricks:spark-csv_2.10:1.0.3 to submit my python script. I'm using pytest to run my tests with Spark in local mode: conf = SparkConf().setAppName('myapp').setMaster('local[1]') sc = SparkContext(conf=conf) My question is, since...

Which spark MLIB algorithm to use?


machine-learning,apache-spark
I'm newbie to machine learning and would like to understand what algorithm (Classification algorithm or co-relation algorithm?) to use in order to understand what is the relationship between one or more attributes. for example consider I have following set of attributes, Bill No, Bill Amount, Tip amount, Waiter Name and...

Pyspark: using filter for feature selection


python,apache-spark,pyspark
I have an array of dimensions 500 x 26. Using the filter operation in pyspark, I'd like to pick out the columns which are listed in another array at row i. Ex: if a[i]= [1 2 3] Then pick out columns 1, 2 and 3 and all rows. Can this...

How to change the flush queue size of cassandra


cassandra,datastax-enterprise,datastax-java-driver
How to assign more memory for the flush queue between memtable and sstable in Cassandra. I have getting timeout errors and the heap and young region usage seems to within limits. There is no other processing happening except Cassandra in the machine. Also how to find if any requests are...

File Processing with Spark and Cassandra


cassandra,apache-spark
Right now I'm working on loading a table from a Cassandra cluster into a Spark cluster with the Datastax Cassandra Spark Connector. Right now the spark program performs a simple mapreduce job that counts the number of rows in the Cassandra table. Everything is set up and run locally. The...

Spark streaming on YARN executor's logs not available


logging,apache-spark,yarn,spark-streaming
I'm running the following code .map{x => Logger.fatal("Hello World") x._2 } It's spark streaming applciation runs on YARN. I upadted log4j and provided it with spark-submit (using --files). My Log4j configuration was loaded which I see from logs and applied to Driver's logs (I see my log level only and...

SparkR and Packages


r,apache-spark,sparkr
How do one call packages from spark to be utilized for data operations with R? example i am trying to access my test.csv in hdfs as below Sys.setenv(SPARK_HOME="/opt/spark14") library(SparkR) sc <- sparkR.init(master="local") sqlContext <- sparkRSQL.init(sc) flights <- read.df(sqlContext,"hdfs://sandbox.hortonWorks.com:8020 /user/root/test.csv","com.databricks.spark.csv", header="true") but getting error as below: Caused by: java.lang.RuntimeException: Failed to...

OutofMemoryErrory creating fat jar with sbt assembly


jar,cassandra,apache-spark,sbt
We are trying to make a fat jar file containing one small scala source file and a ton of dependencies (simple mapreduce example using spark and cassandra): import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import com.datastax.spark.connector._ import org.apache.spark.SparkConf object VMProcessProject { def main(args: Array[String]) { val conf = new SparkConf() .set("spark.cassandra.connection.host", "127.0.0.1") .set("spark.executor.extraClassPath",...

to alter or create a new table in cassandra to add new columns


database-design,cassandra,datastax,datastax-enterprise
I am using DSE cassandra. I wanted to add new attributes to the existing table. I wanted to know what is the best practice to achieve this? Should i be adding new columns to existing table or creating new table? What are the pros and cons for either approach?...

How to un-nest a spark rdd that has the following type ((String, scala.collection.immutable.Map[String,scala.collection.immutable.Map[String,Int]]))


scala,cassandra,apache-spark
Its a nested map with contents like this when i print it onto screen (5, Map ( "ABCD" -> Map("3200" -> 3, "3350.800" -> 4, "200.300" -> 3) (1, Map ( "DEF" -> Map("1200" -> 32, "1320.800" -> 4, "2100" -> 3) I need to get something like this Case...

PySpark No suitable driver found for jdbc:mysql://dbhost


apache-spark,apache-spark-sql,pyspark
I am trying to write my dataframe to a mysql table. I am getting No suitable driver found for jdbc:mysql://dbhost when I try write. As part of the preprocessing I read from other tables in the same DB and have no issues doing that. I can do the full run...

How to get rid of “Spark assembly has been built with Hive, including Datanucleus jars on classpath” message?


cron,apache-spark
I am running an Apache Spark app as a cron job, but I keep getting emails with the following message Spark assembly has been built with Hive, including Datanucleus jars on classpath message My cron entry is something like the following ...home/sparkJob.sh > /home/SaprkJobOperation-`date +\%Y\%m\%d\%H`-cron.log I understand that there are...

Cassandra data model to store embedded documents


mongodb,database-design,cassandra
In mongodb we can able to store embedded documents into a collection.Then, How do we store embedded documents into cassandra??? For this sample JSON representation??? UserProfile = { name: "user profile", Dave Jones: { email: {name: "email", value: "[email protected]", timestamp: 125555555}, userName: {name: "userName", value: "Dave", timestamp: 125555555} }, Paul...

Passing a function foreach key of an Array


scala,apache-spark,scala-collections,spark-graphx
I have an array like that : val pairs: Array[(Int, ((VertexId, Seq[Int]), Int))] which generates this output : (11,((11,ArraySeq(2, 5, 4, 5)),1)) (11,((12,ArraySeq(7, 7, 8, 2)),1)) (11,((13,ArraySeq(5, 9, 8, 7)),1)) (1,((1,ArraySeq(1, 2, 3, 4)),1)) (1,((4,ArraySeq(1, 5, 1, 1)),1)) I want to build a Graph for each pairs._1. That means for...

Retrieving TriangleCount


scala,apache-spark,spark-graphx
I'm trying to retrieve the amount of triangles from a graph using graphX. As I'm new to both Scala and graphX, I'm currently quite stuck. I'm creating a graph from an edgefile: 1 2 1 3 2 3 This should be 1 triangle. Next I'm using the build in function...

Spark streaming transform function


java,apache-spark,spark-streaming
I am having compilation errors in the transform function for spark streaming. Specifically seem to be missing finalizing the DStream variable or something similar. I have copied from the amplab tutorials so slightly confused... Here is the code, the problem is in the transform function towards the end. Here is...

Spark on yarn jar upload problems


java,hadoop,mapreduce,apache-spark
I am trying to run a simple Map/Reduce java program using spark over yarn (Cloudera Hadoop 5.2 on CentOS). I have tried this 2 different ways. The first way is the following: YARN_CONF_DIR=/usr/lib/hadoop-yarn/etc/hadoop/; /var/tmp/spark/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class MRContainer --master yarn-cluster --jars /var/tmp/spark/spark-1.4.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-hadoop2.4.0.jar simplemr.jar This method gives the following error: diagnostics: Application application_1434177111261_0007...

Pyspark StructType is not defined


python,apache-spark,pyspark
I'm trying to struct a schema for db testing, and StructType apparently isn't working for some reason. I'm following a tut, and it doesn't import any extra module. <type 'exceptions.NameError'>, NameError("name 'StructType' is not defined",), <traceback object at 0x2b555f0>) I'm on spark 1.4.0, and Ubuntu 12 if that has anything...

Access key from mapValues or flatMapValues?


scala,apache-spark
In Spark 1.3, is there a way to access the key from mapValues? Specifically, if I have val y = x.groupBy(someKey) val z = y.mapValues(someFun) can someFun know which key of y it is currently operating on? Or do I have to do val y = x.map(r => (someKey(r), r)).groupBy(_._1)...

Count of second dimension in two dimension data in Spark


apache-spark
I have the data in this format (apple, laptop) (apple, laptop) (apple, ipad) (dell, laptop) I want to output to be (apple, laptop, 2) (apple, ipad, 1) (dell, laptop, 1) I wanted to do this using groupby and then count but groupby is not allowing grouping based on two columns....

cassandra search a row by secondary index returns null


cassandra,secondary-indexes
I have created a TABLE and index As follows CREATE TABLE refresh_token ( user_id bigint, refresh_token text, access_token text, device_desc text, device_type text, expire_time timestamp, org_id bigint, PRIMARY KEY (user_id, refresh_token) ) WITH CLUSTERING ORDER BY (refresh_token ASC) CREATE INDEX i_access_token ON demodb.refresh_token (access_token); After i insert or delete data...

Apache Spark: Error while starting PySpark


python,hadoop,apache-spark,pyspark
On a Centos machine, Python v2.6.6 and Apache Spark v1.2.1 Getting the following error when trying to run ./pyspark Seems some issue with python but not able to figure out 15/06/18 08:11:16 INFO spark.SparkContext: Successfully stopped SparkContext Traceback (most recent call last): File "/usr/lib/spark_1.2.1/spark-1.2.1-bin-hadoop2.4/python/pyspark/shell.py", line 45, in <module> sc =...

Graphx EdgeRDD count taking long time to compute


apache-spark,spark-graphx
I am running a stand alone spark, I have this code below related to EdgeRDD. These are graph edges loaded from a textfile. There are around 67 million records. val edges: RDD[Edge[Int]] = edge_file.map(line => {val x = line.split("\\s+") Edge(x(0).toLong, x(1).toLong, x(2).toInt); }) val edges1: EdgeRDD[Int] = EdgeRDD.fromEdges(edges) println(edges1.count) The...

spark-mllib: Error “reassignment to val” in source code


apache-spark,mllib
I'm using IDEA SBT project to test spark-mllib code. Here is build.sbt: name := "SparkTest" version := "1.0" scalaVersion := "2.11.6" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.2.0", "org.apache.spark" %% "spark-mllib" % "1.2.0" ) After all the import and compile work has done, I found some errors in lib...

Convert RDD[Map[String,Double]] to RDD[(String,Double)]


scala,apache-spark,rdd
I did some calculation and returned my values in a RDD containing scala map and now I want to remove this map and want to collect all keys values in a RDD. Any help will be appreciated....

Profiling a scala spark application


scala,apache-spark
I would like to profile my spark scala applications to figure out the parts of the code which i have to optimize. I enabled -Xprof in --driver-java-options but this is not of much help to me as it gives lot of granular details. I am just interested to know how...

Is it possible to use a timestamp in ms since epoch in select statement for Cassandra?


cassandra,timestamp,cql
I know that using the formats listed here (http://docs.datastax.com/en/cql/3.0/cql/cql_reference/timestamp_type_r.html) work to query cassandra. However, I'm having a hard time determining if it is even possible to use ms since epoch in the select statement. I feel like it should since it data can be sent to cassandra in the ms...

Removing duplicates from Spark RDDPair values


python,apache-spark,pyspark
I am new to Python and also Spark. I've an pair RDD containing (key, List) but some of the values are duplicate. RDD is of the form (zipCode,streets) I want a pair RDD which does not contain duplicates. I am trying to achieve it using python. Can anyone please help...

Connecting from Spark/pyspark to PostgreSQL


postgresql,jdbc,jar,apache-spark,pyspark
I've installed Spark on a Windows machine and want to use it via Spyder. After some troubleshooting the basics seems to work: import os os.environ["SPARK_HOME"] = "D:\Analytics\Spark\spark-1.4.0-bin-hadoop2.6" from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext spark_config = SparkConf().setMaster("local[8]") sc = SparkContext(conf=spark_config) sqlContext = SQLContext(sc) textFile = sc.textFile("D:\\Analytics\\Spark\\spark-1.4.0-bin-hadoop2.6\\README.md") textFile.count() textFile.filter(lambda...

Spark executors with different amounts of memory on Mesos


apache-spark,mesos
Is it possible to have executors with different amounts of memory on a Mesos cluster? Or am I bounded by the machine with the least memory? (Assuming I want to use all available cpus).

Using partition key along with secondary index


cassandra,nosql,bigdata,cassandra-2.0
Following are the two queries that I need to perform. select * from where dept = 100 and emp_id = 1; select * from where dept = 100 and name = 'One'; Which of the below options is better ? Option 1: Use secondary index along with a partition key....

Select first N rows of Cassandra table


cassandra,cql
As stated in this doc to select a range of rows i have to write this: select first 100 col1..colN from table; but when I launch this on cql shell I get this error: <ErrorMessage code=2000 [Syntax error in CQL query] message="line 1:13 no viable alternative at input '100' (select...

Error when running job that queries against Cassandra via Spark SQL through Spark Jobserver


cassandra,apache-spark,apache-spark-sql,spark-jobserver,spark-cassandra-connector
So I'm trying to run job that simply runs a query against cassandra using spark-sql, the job is submitted fine and the job starts fine. This code works when it is not being run through spark jobserver (when simply using spark submit). Could someone tell my what is wrong with...

Shuffled vs non-shuffled coalesce in Apache Spark


scala,apache-spark,bigdata,distributed-computing
What is the difference between the following transformations when they are executed right before writing RDD to a file? coalesce(1, shuffle = true) coalesce(1, shuffle = false) Code example: val input = sc.textFile(inputFile) val filtered = input.filter(doSomeFiltering) val mapped = filtered.map(doSomeMapping) mapped.coalesce(1, shuffle = true).saveAsTextFile(outputFile) vs mapped.coalesce(1, shuffle = false).saveAsTextFile(outputFile)...

Exporting Data from Cassandra to CSV file


apache,csv,cassandra,export,export-to-csv
Table Name : Product uid | productcount | term | timestamp 304ad5ac-4b6d-4025-b4ea-8b7991a3fe72 | 26 | dress | 1433110980000 6097e226-35b5-4f71-b158-a1fe39a430c1 | 0 | #751104 | 1433861040000 Command : COPY product (uid, productcount, term, timestamp) TO 'temp.csv'; Error: Improper COPY command. Am I missing something? ...

Issue with UDF on a column of Vectors in PySpark DataFrame


apache-spark,apache-spark-sql,pyspark,spark-sql
I am having trouble using a UDF on a column of Vectors in PySpark which can be illustrated here: from pyspark import SparkContext from pyspark.sql import Row from pyspark.sql.types import DoubleType from pyspark.sql.functions import udf from pyspark.mllib.linalg import Vectors FeatureRow = Row('id', 'features') data = sc.parallelize([(0, Vectors.dense([9.7, 1.0, -3.2])), (1,...

How to extract application ID from the PySpark context


apache-spark,yarn,pyspark
A previous question recommends sc.applicationId, but it is not present in PySpark, only in scala. So, how do I figure out the application id (for yarn) of my PySpark process?...

In sbt, how can we specify the version of hadoop on which spark depends?


apache-spark,sbt
Well I have a sbt project which uses spark and spark sql, but my cluster uses hadoop 1.0.4 and spark 1.2 with spark-sql 1.2, currently my build.sbt looks like this: libraryDependencies ++= Seq( "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5", "com.datastax.cassandra" % "cassandra-driver-mapping" % "2.1.5", "com.datastax.spark" % "spark-cassandra-connector_2.10" % "1.2.1", "org.apache.spark" %...

Conditionally Combining/Reducing key-pairs


python,apache-spark,pyspark
I've had this issue for some time now, and I think it has to do with my lack of understanding of how to use combineByKey and reduceByKey, so hopefully somebody can clear this up. I am working with DNA sequences, so I have a procedure to produce a bunch of...

Is the DStream return by updateStateByKey function only contains one RDD?


apache-spark,spark-streaming,apache-spark-sql,pyspark
Is the DStream return by updateStateByKey function only contains one RDD? If not,Under what circumstances will the DStream contains more than one RDD?

Join files using Apache Spark / Spark SQL


java,apache-spark,apache-spark-sql
I am trying to use Apache Spark for comparing two different files based on some common field, and get the values from both files and write it as output file. I am using Spark SQL for joining both files (after storing the RDD as table). Is this the correct approach?...

Spark-submit class not found exception


scala,apache-spark
I am trying to run this simple spark application with the spark submit command using this quick start tutorial. http://spark.apache.org/docs/1.2.0/quick-start.html#self-contained-applications. when I try to run it using spark-1.4.0-bin-hadoop2.6\bin>spark-submit --class " SimpleApp" --master local[4] C:/.../Documents/Sparkapp/target/scala- 2.10/simple-project_2.10-1.0.jar I get the following exception: java.lang.ClassNotFoundException: SimpleApp at java.net.URLClassLoader$1.run(Unknown Source) at java.net.URLClassLoader$1.run(Unknown...

How do I flatMap a row of arrays into multiple rows?


apache-spark,apache-spark-sql
After parsing some jsons I have a one-column DataFrame of arrays scala> val jj =sqlContext.jsonFile("/home/aahu/jj2.json") res68: org.apache.spark.sql.DataFrame = [r: array<bigint>] scala> jj.first() res69: org.apache.spark.sql.Row = [List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)] I'd like to explode each row out into several rows. How? edit: Original json file:...