|
| 1 | +import com.amazonaws.services.glue.GlueContext |
| 2 | +import com.amazonaws.services.glue.util.GlueArgParser |
| 3 | +import com.amazonaws.services.glue.util.Job |
| 4 | +import org.apache.spark.SparkContext |
| 5 | +import org.apache.spark.SparkConf |
| 6 | +import org.apache.spark.sql.Dataset |
| 7 | +import org.apache.spark.sql.Row |
| 8 | +import org.apache.spark.sql.SaveMode |
| 9 | +import org.apache.spark.sql.SparkSession |
| 10 | +import org.apache.spark.sql.functions.from_json |
| 11 | +import org.apache.spark.sql.streaming.Trigger |
| 12 | +import scala.collection.JavaConverters._ |
| 13 | +import com.datastax.spark.connector._ |
| 14 | +import org.apache.spark.sql.cassandra._ |
| 15 | +import org.apache.spark.sql.SaveMode._ |
| 16 | +import com.datastax.spark.connector._ |
| 17 | +import com.datastax.spark.connector.cql._ |
| 18 | +import com.datastax.oss.driver.api.core._ |
| 19 | +import org.apache.spark.sql.functions.rand |
| 20 | +import com.amazonaws.services.glue.log.GlueLogger |
| 21 | +import java.time.ZonedDateTime |
| 22 | +import java.time.ZoneOffset |
| 23 | +import java.time.temporal.ChronoUnit |
| 24 | +import java.time.format.DateTimeFormatter |
| 25 | + |
| 26 | + |
| 27 | +object GlueApp { |
| 28 | + |
| 29 | + def main(sysArgs: Array[String]) { |
| 30 | + |
| 31 | + val requiredParams = Seq("JOB_NAME", "KEYSPACE_NAME", "TABLE_NAME", "DRIVER_CONF") |
| 32 | + |
| 33 | + val optionalParams = Seq("DISTINCT_KEYS", "QUERY_FILTER", "FORMAT", "S3_URI") |
| 34 | + |
| 35 | + // Build a list of optional parameters that exist in sysArgs |
| 36 | + val validOptionalParams = optionalParams.filter(param => sysArgs.contains(s"--$param") && param.trim.nonEmpty) |
| 37 | + |
| 38 | + // Combine required and valid optional parameters |
| 39 | + val validParams = requiredParams ++ validOptionalParams |
| 40 | + |
| 41 | + val args = GlueArgParser.getResolvedOptions(sysArgs, validParams.toArray) |
| 42 | + |
| 43 | + val driverConfFileName = args("DRIVER_CONF") |
| 44 | + |
| 45 | + val conf = new SparkConf() |
| 46 | + .setAll( |
| 47 | + Seq( |
| 48 | + ("spark.task.maxFailures", "100"), |
| 49 | + |
| 50 | + ("spark.cassandra.connection.config.profile.path", driverConfFileName), |
| 51 | + ("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions"), |
| 52 | + ("directJoinSetting", "on"), |
| 53 | + |
| 54 | + ("spark.cassandra.output.consistency.level", "LOCAL_QUORUM"),//WRITES |
| 55 | + ("spark.cassandra.input.consistency.level", "LOCAL_ONE"),//READS |
| 56 | + |
| 57 | + ("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"), |
| 58 | + ("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"), |
| 59 | + ("spark.cassandra.concurrent.reads", "50"), |
| 60 | + |
| 61 | + ("spark.cassandra.output.concurrent.writes", "3"), |
| 62 | + ("spark.cassandra.output.batch.grouping.key", "none"), |
| 63 | + ("spark.cassandra.output.batch.size.rows", "1"), |
| 64 | + ("spark.cassandra.output.batch.size.rows", "1"), |
| 65 | + ("spark.cassandra.output.ignoreNulls", "true") |
| 66 | + )) |
| 67 | + |
| 68 | + |
| 69 | + val spark: SparkContext = new SparkContext(conf) |
| 70 | + val glueContext: GlueContext = new GlueContext(spark) |
| 71 | + val sparkSession: SparkSession = glueContext.getSparkSession |
| 72 | + |
| 73 | + import sparkSession.implicits._ |
| 74 | + |
| 75 | + Job.init(args("JOB_NAME"), glueContext, args.asJava) |
| 76 | + |
| 77 | + val logger = new GlueLogger |
| 78 | + |
| 79 | + //validation steps for peers and partitioner |
| 80 | + val connector = CassandraConnector.apply(conf); |
| 81 | + val session = connector.openSession(); |
| 82 | + val peersCount = session.execute("SELECT * FROM system.peers").all().size() |
| 83 | + |
| 84 | + val partitioner = session.execute("SELECT partitioner from system.local").one().getString("partitioner") |
| 85 | + |
| 86 | + logger.info("Total number of seeds:" + peersCount) |
| 87 | + logger.info("Configured partitioner:" + partitioner) |
| 88 | + |
| 89 | + if(peersCount == 0){ |
| 90 | + throw new Exception("No system peers found. Check required permissions to read from the system.peers table. If using VPCE check permissions for describing VPCE endpoints. https://docs.aws.amazon.com/keyspaces/latest/devguide/vpc-endpoints.html") |
| 91 | + } |
| 92 | + |
| 93 | + if(partitioner.equals("com.amazonaws.cassandra.DefaultPartitioner")){ |
| 94 | + throw new Exception("Sark requires the use of RandomPartitioner or Murmur3Partitioner. See Working with partioners in Amazon Keyspaces documentation. https://docs.aws.amazon.com/keyspaces/latest/devguide/working-with-partitioners.html") |
| 95 | + } |
| 96 | + |
| 97 | + val backupLocation = args.getOrElse("S3_URI", "") |
| 98 | + val backupFormat = args.getOrElse("FORMAT", "parquet") |
| 99 | + val filterCriteria = args.getOrElse("QUERY_FILTER", "") |
| 100 | + |
| 101 | + val tableName = args("TABLE_NAME") |
| 102 | + val keyspaceName = args("KEYSPACE_NAME") |
| 103 | + |
| 104 | + |
| 105 | + val query = |
| 106 | + s""" |
| 107 | + |SELECT column_name, kind |
| 108 | + |FROM system_schema.columns |
| 109 | + |WHERE keyspace_name = '$keyspaceName' AND table_name = '$tableName'; |
| 110 | + |""".stripMargin |
| 111 | + |
| 112 | + // Execute the query |
| 113 | + val resultSet = session.execute(query) |
| 114 | + |
| 115 | + val validKinds = Set("partition_key", "clustering") |
| 116 | + // Extract primary key column names |
| 117 | + |
| 118 | + val primaryKeyColumnsCSV = resultSet.all().asScala |
| 119 | + .filter(row => validKinds.contains(row.getString("kind"))) |
| 120 | + .map(_.getString("column_name")) |
| 121 | + .toList |
| 122 | + .mkString(", ") |
| 123 | + |
| 124 | + // Output the primary key columns |
| 125 | + logger.info(s"Primary Key Columns for $keyspaceName.$tableName: ${primaryKeyColumnsCSV}") |
| 126 | + |
| 127 | + val distinctKeys = args.getOrElse("DISTINCT_KEYS", primaryKeyColumnsCSV).filterNot(_.isWhitespace).split(",") |
| 128 | + |
| 129 | + // Output the primary key columns |
| 130 | + logger.info(s"Primary Key Columns for $keyspaceName.$tableName: ${distinctKeys.mkString(", ")}") |
| 131 | + |
| 132 | + var tableDf = sparkSession.read |
| 133 | + .format("org.apache.spark.sql.cassandra") |
| 134 | + .options(Map( "table" -> tableName, |
| 135 | + "keyspace" -> keyspaceName, |
| 136 | + "pushdown" -> "false"))//set to true when executing against Apache Cassandra, false when working with Keyspaces |
| 137 | + .load() |
| 138 | + |
| 139 | + if(filterCriteria.trim.nonEmpty){ |
| 140 | + tableDf = tableDf.filter(filterCriteria) |
| 141 | + } |
| 142 | + |
| 143 | + //backup to s3 for data that wil be deleted |
| 144 | + if(backupLocation.trim.nonEmpty){ |
| 145 | + val now = ZonedDateTime.now( ZoneOffset.UTC )//.truncatedTo( ChronoUnit.MINUTES ).format( DateTimeFormatter.ISO_DATE_TIME ) |
| 146 | + |
| 147 | + //backup location for deletes |
| 148 | + val fullbackuplocation = backupLocation + |
| 149 | + "/export" + |
| 150 | + "/" + keyspaceName + |
| 151 | + "/" + tableName + |
| 152 | + "/bulk-delete" + |
| 153 | + "/year=" + "%04d".format(now.getYear()) + |
| 154 | + "/month=" + "%02d".format(now.getMonthValue()) + |
| 155 | + "/day=" + "%02d".format(now.getDayOfMonth()) + |
| 156 | + "/hour=" + "%02d".format(now.getHour()) + |
| 157 | + "/minute=" + "%02d".format(now.getMinute()) |
| 158 | + |
| 159 | + |
| 160 | + |
| 161 | + tableDf.write.format(backupFormat).mode(SaveMode.ErrorIfExists).save(fullbackuplocation) |
| 162 | + } |
| 163 | + |
| 164 | + tableDf.select(distinctKeys.head, distinctKeys.tail:_*).rdd.deleteFromCassandra(keyspaceName, tableName) |
| 165 | + |
| 166 | + Job.commit() |
| 167 | + } |
| 168 | +} |
0 commit comments