Vote count:
0
Let's assume we have a Cassandra cluster with RF = N and a table containing wide rows.
Our table could have an index something like this: pk / ck1 / ck2 / ....
If I create an RDD from the table as follows:
val wide_row = sc.cassandraTable(KS, TABLE).select("c1", "c2").where("pk = ?", PK)
I notice that one Spark node has 100% of the table and the others have none. I assume this is because the spark-cassandra-connector has no way of breaking down the query token range into smaller sub ranges because it's actually not a range -- it's simply the hash of PK.
At this point we could simply call redistribute(N)
to spread the data across the Spark cluster before processing, but this has the effect of moving data across the network to nodes that already have the data locally in Cassandra (remember RF = N)
What we would really like is to have each Spark node load a subset of the wide row locally from Cassandra.
One approach which came to mind is to generate an RDD containing the list of all values of the first cluster key (ck1) when pk = PK. We could then use mapPartitions()
to load a slice of the wide row based on each value of ck1.
Assuming we already have our list values for ck1, we could write something like this:
val ck1_list = .... // RDD
ck1_list.repartition(ck1_list.count().toInt) // create a partition for each value of ck1
val wide_row = ck1_list.mapPartitionsWithIndex(f)
Within the partition iterator, f(), we would like to call another function g(pk, ck1) which loads the row slice from Cassandra for row key pk
and column ck1
. We could then apply flatMap
to ck1_list
so as to create a fully distributed RDD of the wide row without any shuffing.
So here's the question:
Is it possible to make a CQL call from within a Spark task? What driver should be used? Can it be set up only once an reused for subsequent tasks?
Any help would be greatly appreciated, thanks.
Partitioning wide rows in Spark / Cassandra
Aucun commentaire:
Enregistrer un commentaire