Publish Date: 2022-10-07
Apache Spark™ is considered as the most powerful engine for executing data engineering, data science, and machine learning on single-node machines or clusters over diverse data sources such as NoSQL databases.
Apache Cassandra is an open source NoSQL distributed database trusted by thousands of companies for scalability and high availability without compromising performance, which make it the perfect platform for mission-critical data.
In our article today we will learn how to get started working with cassandra from Apache Spark with different approaches involved, using Scala.
Before moving further let’s take a look into how cassandra works and get some data ready to deal with.
As we have the concept of a database in relational models, we have a concept of a keyspace in cassandra model which is a container for tables. A keyspace has basically a name and a set of attributes that define its behavior such as class and replication.
To list the keyspaces in your environment, type the next command in cqlsh shell:
DESCRIBE KEYSPACES;
system_auth system_schema system_views
system system_distributed system_traces system_virtual_schema
as you see, all the keyspaces those who start with system_*, came with casandra installation by default .
to create a keyspace, we must set a name along with class and replication factor:
CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
In our example we gave our keyspace a name of: test and specify class and replication:
class : here we set class SimpleStrategy because we are running cassandra in single node. For cluster and production environment you would set it to: "NetworkTopologyStrategy"
replication factor: because we have single node, 1 copy of data would be enough, but in distributed cluster nodes of cassandra deployment, you have to set it to 3 for availability and fault tolerance.
After you create your keyspace , switch to it by typing:
USE test;
cqlsh:test>
the next step is to create a table to hold data. The next command will create a table with 2 columns: word of type text, and count column of type integer ; word column is a primary key:
CREATE TABLE test.words (word text PRIMARY KEY, count int);
No,w we have to insert some rows in a table, run and use the next syntax to do so:
INSERT INTO words(word, count) VALUES ('and', 50);
INSERT INTO words(word, count) VALUES ('he', 10);
INSERT INTO words(word, count) VALUES ('it', 12);
the returned rows show that the insert is successful:
SELECT * FROM words;
word | count
------+-------
he | 10
it | 12
and | 50
(3 rows)
now after we get our data ready in cassandra, the next step is start working with Spark.
before going further , make sure that the Spark version is not above than 3.2.x , because even spark 3.3.0 is available , but casandra does not support it, at least not all the features of spark 3.3 yet. You can refer to the documentation to see the compatible version match with your installation of cassandra and spark.
in our demo we will use Spark 3.2.1 and Datastax cassandra driver 3.2. you can download the spark casandra connector jar file or just pick the right one to you from mvn repository .
whichever version you use, copy it into SPARK_HOME inside jars folder in order to use it later. you can use next commands to achieve just that:
cd $SPARK_HOME/jars
wget https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector_2.12/3.2.0/spark-cassandra-connector_2.12-3.2.0.jar
ls -l | grep spark-cassandra
-rw-rw-r-- 1 ahmed ahmed 14911489 May 26 09:20 spark-cassandra-connector-assembly_2.12-3.2.0.jar
if you are working in sbt project add this dependency to the list of dependencies in your build.sbt file:
// https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.2.0"
In our case and the rest of our demo, we will work with spark-shell to query cassandra. you can start spark shell and access the Scala console to start an interactive session ,using this syntax :
cd $SPARK_HOME
spark-shell --driver-class-path jars/spark-cassandra-connector-assembly_2.12-3.2.0.jar
--jars jars/spark-cassandra-connector-assembly_2.12-3.2.0.jar
--conf spark.cassandra.connection.host=127.0.0.1
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
The extensions configuration option enables Cassandra Specific Catalyst optimizations and functions.
Note: if you don't want to start spark from installation path, then just use the full path of jar file.
another way to start Spark-shell without using a jar file, is to specify the casandra package during the start of spark (you need to be connected to the internet in order to download the required dependencies ), as the next command show:
spark-shell --conf spark.cassandra.connection.host=127.0.0.1
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.2.0
Typically to get access cassandra data and deal with it from spark, you have to use either Data Source V1 for the Spark Cassandra Connector which is the old way but still work, and can return data as RDD or Dataframes/Datasets .or use the Data Source V2 for the Spark Cassandra Connector which is the new and recommended way.
The first step is to import datastax spark connector:
import com.datastax.spark.connector._
now we have to establish a connection and assign it to an rdd variable:
val keyspace = "test"
val table = "words"
val rdd = sc.cassandraTable(keyspace, table)
we can print the fetched data, the returned data is of type CasandraRow :
rdd.foreach(println)
CassandraRow{word: he, count: 10}
CassandraRow{word: it, count: 12}
CassandraRow{word: and, count: 50}
if we want to write some rows back to our 'words' cassandra table, we can do so using next syntax:
val new_rows = sc.parallelize(Seq(("did", 200), ("run", 74)))
new_rows.saveToCassandra(keyspace, table)
If we print our data using previous statement, you will see it return new data:
rdd.foreach(println)
CassandraRow{word: he, count: 10}
CassandraRow{word: it, count: 12}
CassandraRow{word: and, count: 50}
CassandraRow{word: run, count: 74}
CassandraRow{word: did, count: 200}
We can use rdd only on limited scope, but dataframe/dataset API offer wide range of options and flexibility to deal with data.
To read and save the returned data from cassandra as dataframe, we have to specify the format as mentioned in next statement:
val df = spark.read.
| format("org.apache.spark.sql.cassandra").
| options(
| Map(
| "table" -> "words",
| "keyspace" -> "test"
| )
| ).load()
now we have our data , so we can do any kind of manipulation on it. let's show it's content:
df.show()
+----+-----+
|word|count|
+----+-----+
| did| 200|
| it| 12|
| and| 50|
| run| 74|
| he| 10|
+----+-----+
let's say you want to get words which has count more than 70 :
df.filter("count > 70").show()
+----+-----+
|word|count|
+----+-----+
| did| 200|
| run| 74|
+----+-----+
to save the dataframe back into specific table in cassandra you can do so:
df.write.
| format("org.apache.spark.sql.cassandra").
| options(
| Map(
| "table" -> "words",
| "keyspace" -> "test" )
| ).save()
if you execute this statement as it is, it will return an error because the same data already exist, you can avoid that behavior by specify option mode either 'append' or 'overrite' mode, or change table and keyspace names.
as you can see the old method use basically rdd, and it’s hard to deal with it in most cases. The new API use dataframe and spark SQL which is to work with Catalogs.
the first thing to do is to create a catalog reference to your Cassandra Cluster, in our case we give the catalog a name of 'mycatalog' (you can choose any name you want, but it should be unique):
spark.conf.set(s"spark.sql.catalog.mycatalog", "com.datastax.spark.connector.datasource.CassandraCatalog")
now as we have our catalog created, we have more control over accessing cassandra keyspaces and metadata.
to differentiate from above example we will create a new keyspace directly from spark, without the need to use cqlsh cassandra shell.
the next line will create an actual Keyspace named test2 in Cassandra:
spark.sql("CREATE DATABASE IF NOT EXISTS mycatalog.test2 WITH DBPROPERTIES (class='SimpleStrategy',replication_factor='1')")
the next line syntax will create a table in named words2 in our keyspace test2. the table is composed of 2 columns key and value :
spark.sql("CREATE TABLE IF NOT EXISTS mycatalog.test2.words2 (key Int, value STRING) USING cassandra PARTITIONED BY (key)")
to list the keyspaces in cassandra, we run commands through our catalog using SparkSQL way:
spark.sql("SHOW NAMESPACES FROM mycatalog").show
+-----------+
| namespace|
+-----------+
| test|
| store|
|reservation|
| test2|
|my_keyspace|
| testks|
| hotel|
+-----------+
to list tables for a specific keyspace:
spark.sql("SHOW TABLES FROM mycatalog.test2").show
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
| test2| words2| false|
+---------+---------+-----------+
you can use the SparkSQL or a DataframeReader to load an existing cassandra table:
val df = spark.read.table("mycatalog.test2.words2")
and because the table is empty, this actually will contain 0 rows in result:
println(df.count)
0
df.show
+---+-----+
|key|value|
+---+-----+
+---+-----+
or you can use a DataframeReader as well to load our cassandra table using the syntax in the next line:
spark.sql("SELECT * FROM mycatalog.test2.words2").show
let's create a dataframe that contains 10 rows:
val df2 = spark.
| range(1, 10).
| withColumnRenamed("id", "key").
| withColumn("value", col("key").cast("string") * 2)
df2.show()
+---+-----+
|key|value|
+---+-----+
| 1| 2.0|
| 2| 4.0|
| 3| 6.0|
| 4| 8.0|
| 5| 10.0|
| 6| 12.0|
| 7| 14.0|
| 8| 16.0|
| 9| 18.0|
+---+-----+
let's populate our words2 table with the data from our dataframe df2:
df2.writeTo("mycatalog.test2.words2").append
let's verify if our data is witten successfully:
spark.sql("SELECT * FROM mycatalog.test2.words2").show
+---+-----+
|key|value|
+---+-----+
| 3| 6.0|
| 6| 12.0|
| 5| 10.0|
| 9| 18.0|
| 7| 14.0|
| 8| 16.0|
| 2| 4.0|
| 1| 2.0|
| 4| 8.0|
+---+-----+
a you can see, the data is written successfully.
after you completed the article and practice with the examples, now you can manipulate cassandra database with spark using the old V1 data source, or the new and powerful V2 data source.