Ayadi Tahar | Apache Spark and Cassandra : Best NoSQL Big Data combination

Apache Spark and Cassandra : Best NoSQL Big Data combination

Publish Date: 2022-10-07


Apache Spark™ is considered as the most powerful engine for executing data engineering, data science, and machine learning on single-node machines or clusters over diverse data sources such as NoSQL databases.

Apache Cassandra is an open source NoSQL distributed database trusted by thousands of companies for scalability and high availability without compromising performance, which make it the perfect platform for mission-critical data.

In our article today we will learn how to get started working with cassandra from Apache Spark with different approaches involved, using Scala.

Before moving further let’s take a look into how cassandra works and get some data ready to deal with.

1. Getting started with cassandra

As we have the concept of a database in relational models, we have a concept of a keyspace in cassandra model which is a container for tables. A keyspace has basically a name and a set of attributes that define its behavior such as class and replication.

To list the keyspaces in your environment, type the next command in cqlsh shell:


DESCRIBE KEYSPACES;

system_auth         system_schema  system_views
system  system_distributed  system_traces  system_virtual_schema

as you see, all the keyspaces those who start with system_*, came with casandra installation by default .

Create keyspace

to create a keyspace, we must set a name along with class and replication factor:


CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };

In our example we gave our keyspace a name of: test and specify class and replication:

class : here we set class SimpleStrategy because we are running cassandra in single node. For cluster and production environment you would set it to: "NetworkTopologyStrategy"

replication factor: because we have single node, 1 copy of data would be enough, but in distributed cluster nodes of cassandra deployment, you have to set it to 3 for availability and fault tolerance.

After you create your keyspace , switch to it by typing:


USE test;
cqlsh:test>
Create Table

the next step is to create a table to hold data. The next command will create a table with 2 columns: word of type text, and count column of type integer ; word column is a primary key:


CREATE TABLE test.words (word text PRIMARY KEY, count int);

No,w we have to insert some rows in a table, run and use the next syntax to do so:


INSERT INTO words(word, count) VALUES ('and', 50);
INSERT INTO words(word, count) VALUES ('he', 10);
INSERT INTO words(word, count) VALUES ('it', 12);

the returned rows show that the insert is successful:


SELECT * FROM words;

 word | count
------+-------
   he |    10
   it |    12
  and |    50

(3 rows)

now after we get our data ready in cassandra, the next step is start working with Spark.

2. getting ready with Spark

Managing dependencies

before going further , make sure that the Spark version is not above than 3.2.x , because even spark 3.3.0 is available , but casandra does not support it, at least not all the features of spark 3.3 yet. You can refer to the documentation to see the compatible version match with your installation of cassandra and spark.

in our demo we will use Spark 3.2.1 and Datastax cassandra driver 3.2. you can download the spark casandra connector jar file or just pick the right one to you from mvn repository .

whichever version you use, copy it into SPARK_HOME inside jars folder in order to use it later. you can use next commands to achieve just that:


cd $SPARK_HOME/jars
wget https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector_2.12/3.2.0/spark-cassandra-connector_2.12-3.2.0.jar
ls -l | grep spark-cassandra
-rw-rw-r-- 1 ahmed ahmed 14911489 May 26 09:20 spark-cassandra-connector-assembly_2.12-3.2.0.jar

Using SBT

if you are working in sbt project add this dependency to the list of dependencies in your build.sbt file:


// https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.2.0"

a. Start Spark-shell using jar file

In our case and the rest of our demo, we will work with spark-shell to query cassandra. you can start spark shell and access the Scala console to start an interactive session ,using this syntax :


cd $SPARK_HOME
spark-shell --driver-class-path jars/spark-cassandra-connector-assembly_2.12-3.2.0.jar
  --jars jars/spark-cassandra-connector-assembly_2.12-3.2.0.jar
  --conf spark.cassandra.connection.host=127.0.0.1
  --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

The extensions configuration option enables Cassandra Specific Catalyst optimizations and functions.

Note: if you don't want to start spark from installation path, then just use the full path of jar file.

b. Start Spark-shell without jar file

another way to start Spark-shell without using a jar file, is to specify the casandra package during the start of spark (you need to be connected to the internet in order to download the required dependencies ), as the next command show:


spark-shell --conf spark.cassandra.connection.host=127.0.0.1
  --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
  --packages com.datastax.spark:spark-cassandra-connector_2.12:3.2.0

3. Accessing Cassandra data

Typically to get access cassandra data and deal with it from spark, you have to use either Data Source V1 for the Spark Cassandra Connector which is the old way but still work, and can return data as RDD or Dataframes/Datasets .or use the Data Source V2 for the Spark Cassandra Connector which is the new and recommended way.

a. Accessing Cassandra data with CassandraRDD

The first step is to import datastax spark connector:


import com.datastax.spark.connector._

now we have to establish a connection and assign it to an rdd variable:


val keyspace = "test"
val table = "words"
val rdd = sc.cassandraTable(keyspace, table)

we can print the fetched data, the returned data is of type CasandraRow :


rdd.foreach(println)
CassandraRow{word: he, count: 10}
CassandraRow{word: it, count: 12}
CassandraRow{word: and, count: 50}

if we want to write some rows back to our 'words' cassandra table, we can do so using next syntax:


val new_rows = sc.parallelize(Seq(("did", 200), ("run", 74)))
new_rows.saveToCassandra(keyspace, table)

If we print our data using previous statement, you will see it return new data:


rdd.foreach(println)
CassandraRow{word: he, count: 10}
CassandraRow{word: it, count: 12}
CassandraRow{word: and, count: 50}
CassandraRow{word: run, count: 74}
CassandraRow{word: did, count: 200}

b. Reading cassandra data as Dataframe

We can use rdd only on limited scope, but dataframe/dataset API offer wide range of options and flexibility to deal with data.

To read and save the returned data from cassandra as dataframe, we have to specify the format as mentioned in next statement:


val df = spark.read.
     | format("org.apache.spark.sql.cassandra").
     |         options(
     |             Map(
     |                 "table" -> "words",
     |                 "keyspace" -> "test"
     |             )
     |         ).load()

now we have our data , so we can do any kind of manipulation on it. let's show it's content:


df.show()
+----+-----+
|word|count|
+----+-----+
| did|  200|
|  it|   12|
| and|   50|
| run|   74|
|  he|   10|
+----+-----+

let's say you want to get words which has count more than 70 :


df.filter("count > 70").show()
+----+-----+
|word|count|
+----+-----+
| did|  200|
| run|   74|
+----+-----+

to save the dataframe back into specific table in cassandra you can do so:


df.write.
     | format("org.apache.spark.sql.cassandra").
     |         options(
     |             Map(
     |                 "table" -> "words",
     |                 "keyspace" -> "test" )
     |         ).save()

if you execute this statement as it is, it will return an error because the same data already exist, you can avoid that behavior by specify option mode either 'append' or 'overrite' mode, or change table and keyspace names.

c. Accessing Cassandra data with Catalogs

as you can see the old method use basically rdd, and it’s hard to deal with it in most cases. The new API use dataframe and spark SQL which is to work with Catalogs.

Create a Catalog Reference to your Cassandra Cluster

the first thing to do is to create a catalog reference to your Cassandra Cluster, in our case we give the catalog a name of 'mycatalog' (you can choose any name you want, but it should be unique):


spark.conf.set(s"spark.sql.catalog.mycatalog", "com.datastax.spark.connector.datasource.CassandraCatalog")

now as we have our catalog created, we have more control over accessing cassandra keyspaces and metadata.

Create a keyspace and table in Cassandra

to differentiate from above example we will create a new keyspace directly from spark, without the need to use cqlsh cassandra shell.

the next line will create an actual Keyspace named test2 in Cassandra:


spark.sql("CREATE DATABASE IF NOT EXISTS mycatalog.test2 WITH DBPROPERTIES (class='SimpleStrategy',replication_factor='1')")

the next line syntax will create a table in named words2 in our keyspace test2. the table is composed of 2 columns key and value :


spark.sql("CREATE TABLE IF NOT EXISTS mycatalog.test2.words2 (key Int, value STRING) USING cassandra PARTITIONED BY (key)")
to list the keyspaces in cassandra, we run commands through our catalog using SparkSQL way:

spark.sql("SHOW NAMESPACES FROM mycatalog").show
+-----------+
|  namespace|
+-----------+
|       test|
|      store|
|reservation|
|      test2|
|my_keyspace|
|     testks|
|      hotel|
+-----------+

to list tables for a specific keyspace:


spark.sql("SHOW TABLES FROM mycatalog.test2").show
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|    test2|   words2|      false|
+---------+---------+-----------+
Loading and analyzing data from Cassandra

you can use the SparkSQL or a DataframeReader to load an existing cassandra table:


val df = spark.read.table("mycatalog.test2.words2")

and because the table is empty, this actually will contain 0 rows in result:


println(df.count)
0

df.show
+---+-----+
|key|value|
+---+-----+
+---+-----+

or you can use a DataframeReader as well to load our cassandra table using the syntax in the next line:


spark.sql("SELECT * FROM mycatalog.test2.words2").show
Saving data from a dataframe to Cassandra

let's create a dataframe that contains 10 rows:


val df2 =  spark.
     | range(1, 10).
     | withColumnRenamed("id", "key").
     | withColumn("value", col("key").cast("string") * 2)

df2.show()
+---+-----+
|key|value|
+---+-----+
|  1|  2.0|
|  2|  4.0|
|  3|  6.0|
|  4|  8.0|
|  5| 10.0|
|  6| 12.0|
|  7| 14.0|
|  8| 16.0|
|  9| 18.0|
+---+-----+

let's populate our words2 table with the data from our dataframe df2:


df2.writeTo("mycatalog.test2.words2").append

let's verify if our data is witten successfully:


spark.sql("SELECT * FROM mycatalog.test2.words2").show
+---+-----+
|key|value|
+---+-----+
|  3|  6.0|
|  6| 12.0|
|  5| 10.0|
|  9| 18.0|
|  7| 14.0|
|  8| 16.0|
|  2|  4.0|
|  1|  2.0|
|  4|  8.0|
+---+-----+

a you can see, the data is written successfully.

Conclusion

after you completed the article and practice with the examples, now you can manipulate cassandra database with spark using the old V1 data source, or the new and powerful V2 data source.