diff --git a/bootcamp/materials/1-dimensional-data-modeling/README.md b/bootcamp/materials/1-dimensional-data-modeling/README.md index 090a0d7d..d9445efb 100644 --- a/bootcamp/materials/1-dimensional-data-modeling/README.md +++ b/bootcamp/materials/1-dimensional-data-modeling/README.md @@ -44,7 +44,21 @@ There are two methods to get Postgres running locally. 1. Install Postgres - For Mac: Follow this **[tutorial](https://daily-dev-tips.com/posts/installing-postgresql-on-a-mac-with-homebrew/)** (Homebrew is really nice for installing on Mac) - For Windows: Follow this **[tutorial](https://www.sqlshack.com/how-to-install-postgresql-on-windows/)** -2. Run this command after replacing **``** with your computer's username: +2. Create a database called `postgres` and a user called `postgres` in Postgres, where data can be loaded in Step 3. + ```sql + -- Creates a role + CREATE ROLE postgres WITH LOGIN PASSWORD 'admin'; + ALTER ROLE postgres WITH SUPERUSER CREATEDB; + + -- Drops all tables in public schema. + DROP SCHEMA public CASCADE; + CREATE SCHEMA public; + + GRANT ALL ON SCHEMA public TO postgres; + GRANT ALL ON SCHEMA public TO public; + ``` + +3. Run this command after replacing **``** with your computer's username: ```bash pg_restore -U -d postgres data.dump @@ -52,8 +66,8 @@ There are two methods to get Postgres running locally. If you have any issue, the syntax is `pg_restore -U [username] -d [database_name] -h [host] -p [port] [backup_file]` -3. Set up DataGrip, DBeaver, or your VS Code extension to point at your locally running Postgres instance. -4. Have fun querying! +4. Set up DataGrip, DBeaver, or your VS Code extension to point at your locally running Postgres instance. +5. Have fun querying! ### 🐳 **Option 2: Run Postgres and PGAdmin in Docker** diff --git a/bootcamp/materials/3-spark-fundamentals/notebooks/DatasetApi_v2.ipynb b/bootcamp/materials/3-spark-fundamentals/notebooks/DatasetApi_v2.ipynb new file mode 100644 index 00000000..92712a73 --- /dev/null +++ b/bootcamp/materials/3-spark-fundamentals/notebooks/DatasetApi_v2.ipynb @@ -0,0 +1,831 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "b33ba3da-d8bb-4813-bbef-ce847fb37151", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Intitializing Scala interpreter ..." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "Spark Web UI available at http://d772c787a954:4041\n", + "SparkContext available as 'sc' (version = 3.5.1, master = local[*], app id = local-1734655333256)\n", + "SparkSession available as 'spark'\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "import org.apache.spark.sql.SparkSession\n", + "sparkSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5f152f7f\n" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.sql.SparkSession \n", + "\n", + "val sparkSession = SparkSession.builder.appName(\"Juptyer\").getOrCreate()" + ] + }, + { + "cell_type": "markdown", + "id": "44fd0109-04c6-402a-a8fe-3d7e078c0d22", + "metadata": {}, + "source": [ + "## Events - DATASET" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "6ebe85b4-4dc3-4e95-a8ae-664272ecae73", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "defined class Event\n", + "dummyData: List[Event] = List(Event(None,None,linkedin,eczachly.com,/signup,2023-01-01), Event(None,None,twitter,eczachly.com,/signup,2023-01-01))\n" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "// If something is nullabe, you need to wrap the value type in Option[] - this helps enforce assumptions about the pipeline\n", + "case class Event (\n", + " user_id: Option[Integer],\n", + " device_id: Option[Integer],\n", + " referrer: String,\n", + " host: String,\n", + " url: String,\n", + " event_time: String\n", + ")\n", + "\n", + "val dummyData = List(\n", + " Event(user_id=None, device_id=None, referrer=\"linkedin\", host=\"eczachly.com\", url=\"/signup\", event_time=\"2023-01-01\"),\n", + " Event(user_id=None, device_id=None, referrer=\"twitter\", host=\"eczachly.com\", url=\"/signup\", event_time=\"2023-01-01\")\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "ce3eba90-715d-4467-9247-d7a8ec628206", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "events: org.apache.spark.sql.Dataset[Event] = [user_id: int, device_id: int ... 4 more fields]\n" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "// Applying this case class before hand is very powerful, enforces Nullability/non-nullability at runtime!\n", + "val events = sparkSession.read.option(\"header\", \"true\")\n", + " .option(\"inferSchema\", \"true\")\n", + " .csv(\"/home/iceberg/data/events.csv\")\n", + " .as[Event]" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "7184d68f-a4cf-4301-862d-327fab56c9b9", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "res0: org.apache.spark.sql.DataFrame = [summary: string, user_id: string ... 4 more fields]\n" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "events.describe()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "98fa4941-2c1f-416e-bb66-5b6f85a3d5c9", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----------+---------+--------+--------------------+---+--------------------+\n", + "| user_id|device_id|referrer| host|url| event_time|\n", + "+-----------+---------+--------+--------------------+---+--------------------+\n", + "| 1037710827|532630305| NULL| www.zachwilson.tech| /|2021-03-08 17:27:...|\n", + "| 925588856|532630305| NULL| www.eczachly.com| /|2021-05-10 11:26:...|\n", + "|-1180485268|532630305| NULL|admin.zachwilson....| /|2021-02-17 16:19:...|\n", + "|-1044833855|532630305| NULL| www.zachwilson.tech| /|2021-09-24 15:53:...|\n", + "| 747494706|532630305| NULL| www.zachwilson.tech| /|2021-09-26 16:03:...|\n", + "+-----------+---------+--------+--------------------+---+--------------------+\n", + "only showing top 5 rows\n", + "\n" + ] + } + ], + "source": [ + "events.show(5)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "01c7e46f-e2ce-4cd8-a80d-024adcaee8f9", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "res2: Long = 404814\n" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "events.count()" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "5e18d496-4a39-4691-8689-d863e3d0d4bd", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "filteredViaDataset: org.apache.spark.sql.Dataset[Event] = [user_id: int, device_id: int ... 4 more fields]\n" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "val filteredViaDataset = events.filter(event => event.user_id.isDefined && event.device_id.isDefined)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "75a006e3-be1c-4695-9547-7a8deefd4c29", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "res3: Long = 404747\n" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "filteredViaDataset.count()" + ] + }, + { + "cell_type": "markdown", + "id": "422990b5-e789-4b34-891f-0d9e2a66f8b4", + "metadata": {}, + "source": [ + "## Events - DataFrame" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "b2bb886e-0346-4850-90e4-9a3c7dbdc4df", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "filteredViaDataFrame: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user_id: int, device_id: int ... 4 more fields]\n" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "val filteredViaDataFrame = events.toDF().where($\"user_id\".isNotNull && $\"device_id\".isNotNull)" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "e44fa1b6-7919-43e1-ae7f-34bf95157763", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "res4: Long = 404747\n" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "filteredViaDataFrame.count()" + ] + }, + { + "cell_type": "markdown", + "id": "2af07054-c54d-461b-9080-2be644f42562", + "metadata": {}, + "source": [ + "## Evnets - SQL" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "e39a1601-d202-480a-a643-6311bc44e0aa", + "metadata": {}, + "outputs": [], + "source": [ + "/*\n", + "Ran this code in another notebook to create the events table\n", + "from pyspark.sql import SparkSession\n", + "from pyspark.sql.functions import expr, col\n", + "spark = SparkSession.builder.appName(\"Jupyter\").getOrCreate()\n", + "events = spark.read.option(\"header\", \"true\").csv(\"/home/iceberg/data/events.csv\").withColumn(\"event_date\", expr(\"DATE_TRUNC('day', event_time)\"))\n", + "df.write.mode(\"overwrite\").saveAsTable(\"bootcamp.events_dump\")\n", + "*/" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "25903287-b065-44be-b303-bc0bf6c4172e", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "filteredViaSparkSql: org.apache.spark.sql.DataFrame = [user_id: string, device_id: string ... 5 more fields]\n" + ] + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "val filteredViaSparkSql = sparkSession.sql(\"SELECT * FROM demo.bootcamp.events_dump WHERE user_id IS NOT NULL AND device_id IS NOT NULL\")" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "8bba07f1-acae-4c95-88bf-463fd1210ae1", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "res7: Long = 404747\n" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "filteredViaSparkSql.count()" + ] + }, + { + "cell_type": "markdown", + "id": "1f3ac42a-904f-4f79-b24b-427b22d8d75e", + "metadata": {}, + "source": [ + "## Devices" + ] + }, + { + "cell_type": "code", + "execution_count": 51, + "id": "b235126b-ef2b-416d-835a-4ca746e2ac75", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "import spark.implicits._\n", + "defined class Device\n", + "devices: org.apache.spark.sql.Dataset[Device] = [device_id: int, browser_type: string ... 2 more fields]\n" + ] + }, + "execution_count": 51, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import spark.implicits._\n", + "\n", + "case class Device (\n", + " device_id: Integer,\n", + " browser_type: String,\n", + " os_type: String,\n", + " device_type: String\n", + ")\n", + "\n", + "val devices = sparkSession.read.option(\"header\", \"true\")\n", + " .option(\"inferSchema\", \"true\")\n", + " .csv(\"/home/iceberg/data/devices.csv\")\n", + " .as[Device]" + ] + }, + { + "cell_type": "code", + "execution_count": 52, + "id": "dedd9ee8-3306-44f2-934d-e6d1a14c9d39", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "res33: org.apache.spark.sql.DataFrame = [summary: string, device_id: string ... 3 more fields]\n" + ] + }, + "execution_count": 52, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "devices.describe()" + ] + }, + { + "cell_type": "code", + "execution_count": 53, + "id": "d5c1d10b-52ac-4d19-b5d9-ec8b17743ffd", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----------+--------------------+-------+------------------+\n", + "| device_id| browser_type|os_type| device_type|\n", + "+-----------+--------------------+-------+------------------+\n", + "|-2147042689| Firefox| Ubuntu| Other|\n", + "|-2146219609| WhatsApp| Other| Spider|\n", + "|-2145574618| Chrome Mobile|Android|Generic Smartphone|\n", + "|-2144707350|Chrome Mobile Web...|Android| Samsung SM-G988B|\n", + "|-2143813999|Mobile Safari UI/...| iOS| iPhone|\n", + "+-----------+--------------------+-------+------------------+\n", + "only showing top 5 rows\n", + "\n" + ] + } + ], + "source": [ + "devices.show(5)" + ] + }, + { + "cell_type": "code", + "execution_count": 54, + "id": "1aab8651-bb7c-4732-8782-97890b0cacab", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----------+-----+\n", + "| device_id|count|\n", + "+-----------+-----+\n", + "|-2001648078| 1|\n", + "|-1649034749| 1|\n", + "|-1529640407| 1|\n", + "|-1307533700| 1|\n", + "|-1288794983| 1|\n", + "+-----------+-----+\n", + "only showing top 5 rows\n", + "\n" + ] + } + ], + "source": [ + "devices.groupBy(\"device_id\").count().show(5)" + ] + }, + { + "cell_type": "code", + "execution_count": 55, + "id": "eb74a33e-3967-4b5d-bb99-0ba0e7d71fb2", + "metadata": {}, + "outputs": [], + "source": [ + "devices.createOrReplaceTempView(\"devices\")\n", + "events.createOrReplaceTempView(\"events\")" + ] + }, + { + "cell_type": "code", + "execution_count": 75, + "id": "7cc81a5d-cbe8-4462-8e9d-590e13b5fe49", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----------+---------+------------+-------+-----------+--------+--------------------+---+--------------------+\n", + "| user_id|device_id|browser_type|os_type|device_type|referrer| host|url| event_time|\n", + "+-----------+---------+------------+-------+-----------+--------+--------------------+---+--------------------+\n", + "| 1037710827|532630305| OTHER| Other| Other| NULL| www.zachwilson.tech| /|2021-03-08 17:27:...|\n", + "| 925588856|532630305| OTHER| Other| Other| NULL| www.eczachly.com| /|2021-05-10 11:26:...|\n", + "|-1180485268|532630305| OTHER| Other| Other| NULL|admin.zachwilson....| /|2021-02-17 16:19:...|\n", + "|-1044833855|532630305| OTHER| Other| Other| NULL| www.zachwilson.tech| /|2021-09-24 15:53:...|\n", + "| 747494706|532630305| OTHER| Other| Other| NULL| www.zachwilson.tech| /|2021-09-26 16:03:...|\n", + "+-----------+---------+------------+-------+-----------+--------+--------------------+---+--------------------+\n", + "only showing top 5 rows\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "defined class EventWithDeviceInfo\n", + "toUpperCase: (s: String)String\n", + "combinedViaDatasets: org.apache.spark.sql.Dataset[EventWithDeviceInfo] = [user_id: int, device_id: int ... 7 more fields]\n" + ] + }, + "execution_count": 75, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "case class EventWithDeviceInfo (\n", + " user_id: Integer,\n", + " device_id: Integer,\n", + " browser_type: String,\n", + " os_type: String,\n", + " device_type: String,\n", + " referrer: String,\n", + " host: String,\n", + " url: String,\n", + " event_time: String\n", + ")\n", + "\n", + "def toUpperCase(s: String): String = {\n", + " return s.toUpperCase()\n", + "}\n", + "\n", + "// This will fail if user_id is None\n", + "val combinedViaDatasets = filteredViaDataset\n", + " .joinWith(devices, events(\"device_id\") === devices(\"device_id\"), \"inner\")\n", + " .map{case (event: Event, device: Device) => EventWithDeviceInfo(\n", + " user_id=event.user_id.get,\n", + " device_id=event.device_id.get,\n", + " browser_type=device.browser_type,\n", + " os_type=device.os_type,\n", + " device_type=device.device_type,\n", + " referrer=event.referrer,\n", + " host=event.host,\n", + " url=event.url,\n", + " event_time=event.event_time\n", + " ) }\n", + " .map{ case (row: EventWithDeviceInfo) => \n", + " row.copy(browser_type = row.browser_type.toUpperCase)\n", + " } \n", + " \n", + "combinedViaDatasets.show(5) " + ] + }, + { + "cell_type": "code", + "execution_count": 76, + "id": "c8ecbbd7-03ba-41c2-a403-d7f7025cbef1", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----------+---------+------------+-------+-----------+--------+--------------------+---+--------------------+\n", + "| user_id|device_id|browser_type|os_type|device_type|referrer| host|url| event_time|\n", + "+-----------+---------+------------+-------+-----------+--------+--------------------+---+--------------------+\n", + "| 1037710827|532630305| OTHER| Other| Other| NULL| www.zachwilson.tech| /|2021-03-08 17:27:...|\n", + "| 925588856|532630305| OTHER| Other| Other| NULL| www.eczachly.com| /|2021-05-10 11:26:...|\n", + "|-1180485268|532630305| OTHER| Other| Other| NULL|admin.zachwilson....| /|2021-02-17 16:19:...|\n", + "|-1044833855|532630305| OTHER| Other| Other| NULL| www.zachwilson.tech| /|2021-09-24 15:53:...|\n", + "| 747494706|532630305| OTHER| Other| Other| NULL| www.zachwilson.tech| /|2021-09-26 16:03:...|\n", + "+-----------+---------+------------+-------+-----------+--------+--------------------+---+--------------------+\n", + "only showing top 5 rows\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "toUpperCaseUdf: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$7025/0x0000000842146840@34176149,StringType,List(Some(class[value[0]: string])),Some(class[value[0]: string]),None,true,true)\n", + "combinedViaDataFrames: org.apache.spark.sql.DataFrame = [user_id: int, device_id: int ... 7 more fields]\n" + ] + }, + "execution_count": 76, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "val toUpperCaseUdf = udf(toUpperCase _)\n", + "\n", + "// DataFrames give up some of the intellisense because you no longer have static typing\n", + "val combinedViaDataFrames = filteredViaDataFrame.as(\"e\")\n", + " //Make sure to use triple equals when using data frames\n", + " .join(devices.as(\"d\"), $\"e.device_id\" === $\"d.device_id\", \"inner\")\n", + " .select(\n", + " col(\"e.user_id\"),\n", + " $\"d.device_id\",\n", + " toUpperCaseUdf($\"d.browser_type\").as(\"browser_type\"),\n", + " $\"d.os_type\",\n", + " $\"d.device_type\",\n", + " $\"e.referrer\",\n", + " $\"e.host\",\n", + " $\"e.url\",\n", + " $\"e.event_time\"\n", + " )\n", + " \n", + "combinedViaDataFrames.show(5)" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "23f52419-1bfc-423b-af4a-3ad4398aed01", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----------+---------+------------+-------+-----------+--------+--------------------+---+--------------------+\n", + "| user_id|device_id|browser_type|os_type|device_type|referrer| host|url| event_time|\n", + "+-----------+---------+------------+-------+-----------+--------+--------------------+---+--------------------+\n", + "| 1037710827|532630305| Other| Other| Other| NULL| www.zachwilson.tech| /|2021-03-08 17:27:...|\n", + "| 925588856|532630305| Other| Other| Other| NULL| www.eczachly.com| /|2021-05-10 11:26:...|\n", + "|-1180485268|532630305| Other| Other| Other| NULL|admin.zachwilson....| /|2021-02-17 16:19:...|\n", + "|-1044833855|532630305| Other| Other| Other| NULL| www.zachwilson.tech| /|2021-09-24 15:53:...|\n", + "| 747494706|532630305| Other| Other| Other| NULL| www.zachwilson.tech| /|2021-09-26 16:03:...|\n", + "+-----------+---------+------------+-------+-----------+--------+--------------------+---+--------------------+\n", + "only showing top 5 rows\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "combinedViaSparkSQL: org.apache.spark.sql.DataFrame = [user_id: string, device_id: int ... 7 more fields]\n" + ] + }, + "execution_count": 25, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "//Creating temp views is a good strategy if you're leveraging SparkSQL\n", + "filteredViaSparkSql.createOrReplaceTempView(\"filtered_events\")\n", + "val combinedViaSparkSQL = spark.sql(f\"\"\"\n", + " SELECT \n", + " fe.user_id,\n", + " d.device_id,\n", + " d.browser_type,\n", + " d.os_type,\n", + " d.device_type,\n", + " fe. referrer,\n", + " fe.host,\n", + " fe.url,\n", + " fe.event_time\n", + " FROM filtered_events fe \n", + " JOIN devices d ON fe.device_id = d.device_id\n", + "\"\"\")\n", + "\n", + "combinedViaSparkSQL.show(5)" + ] + }, + { + "cell_type": "markdown", + "id": "badbd9ed-15e2-40ad-a39c-b2b370710378", + "metadata": {}, + "source": [ + "---\n", + "# Original Code from Databricks [ Does not work]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "73b5384f-be28-49e3-9bcf-4b9783ba7d91", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.spark.sql.SparkSession \n", + "\n", + "val sparkSession = SparkSession.builder.appName(\"Juptyer\").getOrCreate()\n", + "\n", + "//TODO Illustrate how this fails if you change from Option[String] to String for referrer\n", + "case class Event (\n", + " //Option is a way to handle NULL more gracefully\n", + " user_id: Option[Integer],\n", + " device_id: Option[Integer],\n", + " referrer: Option[String],\n", + " host: String,\n", + " url: String,\n", + " event_time: String\n", + ")\n", + "\n", + "\n", + "dummyData = List(\n", + " Event(user_id=1, device_id=2, referrer=\"linkedin\", host=\"eczachly.com\", url=\"/signup\", event_time=\"2023-01-01\"),\n", + " Event(user_id=3, device_id=7, referrer=\"twitter\", host=\"eczachly.com\", url=\"/signup\", event_time=\"2023-01-01\")\n", + " )\n", + "\n", + "//TODO Illustrate how this fails if you change from Option[Long] to Long\n", + "case class Device (\n", + " device_id: Integer,\n", + " browser_type: String,\n", + " os_type: String,\n", + " device_type: String\n", + ")\n", + "\n", + "case class EventWithDeviceInfo (\n", + " user_id: Integer,\n", + " device_id: Integer,\n", + " browser_type: String,\n", + " os_type: String,\n", + " device_type: String,\n", + " referrer: String,\n", + " host: String,\n", + " url: String,\n", + " event_time: String\n", + ")\n", + "\n", + "// When should you use each type?\n", + "import sparkSession.implicits._\n", + "\n", + "// Applying this case class before hand is very powerful, enforces Nullability/non-nullability at runtime!\n", + "val events: Dataset[Event] = sparkSession.read.option(\"header\", \"true\")\n", + " .option(\"inferSchema\", \"true\")\n", + " .csv(\"/home/iceberg/data/events.csv\")\n", + " .as[Event]\n", + "\n", + "val devices: Dataset[Device] = sparkSession.read.option(\"header\", \"true\")\n", + " .option(\"inferSchema\", \"true\")\n", + " .csv(\"/home/iceberg/data/devices.csv\")\n", + " .as[Device]\n", + "\n", + "devices.createOrReplaceTempView(\"devices\")\n", + "events.createOrReplaceTempView(\"events\")\n", + "\n", + "// For simple transformations, you can see that these approaches are very similar. Dataset is winning slightly because of the quality enforcement\n", + "val filteredViaDataset = events.filter(event => event.user_id.isDefined && event.device_id.isDefined)\n", + "val filteredViaDataFrame = events.toDF().where($\"user_id\".isNotNull && $\"device_id\".isNotNull)\n", + "val filteredViaSparkSql = sparkSession.sql(\"SELECT * FROM events WHERE user_id IS NOT NULL AND device_id IS NOT NULL\")\n", + "\n", + "\n", + "// This will fail if user_id is None\n", + "val combinedViaDatasets = filteredViaDataset\n", + " .joinWith(devices, events(\"device_id\") === devices(\"device_id\"), \"inner\")\n", + " .map{ case (event: Event, device: Device) => EventWithDeviceInfo(\n", + " user_id=event.user_id.get,\n", + " device_id=device.device_id,\n", + " browser_type=device.browser_type,\n", + " os_type=device.os_type,\n", + " device_type=device.device_type,\n", + " referrer=event.referrer,\n", + " host=event.host,\n", + " url=event.url,\n", + " event_time=event.event_time\n", + " ) }\n", + " .map( case (row: EventWithDeviceInfo) => {\n", + " row.browser_type = toUpperCase(row.browser_type)\n", + " return row\n", + " })\n", + "\n", + "\n", + "\n", + "\n", + "// DataFrames give up some of the intellisense because you no longer have static typing\n", + "val combinedViaDataFrames = filteredViaDataFrame.as(\"e\")\n", + " //Make sure to use triple equals when using data frames\n", + " .join(devices.as(\"d\"), $\"e.device_id\" === $\"d.device_id\", \"inner\")\n", + " .select(\n", + " $\"e.user_id\",\n", + " $\"d.device_id\",\n", + " $\"d.browser_type\",\n", + " $\"d.os_type\",\n", + " $\"d.device_type\",\n", + " $\"e.referrer\",\n", + " $\"e.host\",\n", + " $\"e.url\",\n", + " $\"e.event_time\"\n", + " )\n", + "\n", + "//Creating temp views is a good strategy if you're leveraging SparkSQL\n", + "filteredViaSparkSql.createOrReplaceTempView(\"filtered_events\")\n", + "val combinedViaSparkSQL = spark.sql(f\"\"\"\n", + " SELECT \n", + " fe.user_id,\n", + " d.device_id,\n", + " d.browser_type,\n", + " d.os_type,\n", + " d.device_type,\n", + " fe. referrer,\n", + " fe.host,\n", + " fe.url,\n", + " fe.event_time\n", + " FROM filtered_events fe \n", + " JOIN devices d ON fe.device_id = d.device_id\n", + "\"\"\")\n", + "\n", + "combinedViaDatasets.take(5) " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "203ac70d-b0e5-474b-8c82-b1bae2624d51", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "spylon-kernel", + "language": "scala", + "name": "spylon-kernel" + }, + "language_info": { + "codemirror_mode": "text/x-scala", + "file_extension": ".scala", + "help_links": [ + { + "text": "MetaKernel Magics", + "url": "https://metakernel.readthedocs.io/en/latest/source/README.html" + } + ], + "mimetype": "text/x-scala", + "name": "scala", + "pygments_lexer": "scala", + "version": "0.4.1" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/bootcamp/materials/3-spark-fundamentals/notebooks/bucket-joins-in-iceberg_v2.ipynb b/bootcamp/materials/3-spark-fundamentals/notebooks/bucket-joins-in-iceberg_v2.ipynb new file mode 100644 index 00000000..b1f37a87 --- /dev/null +++ b/bootcamp/materials/3-spark-fundamentals/notebooks/bucket-joins-in-iceberg_v2.ipynb @@ -0,0 +1,776 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "970275c6-f9b1-401f-ae18-ce6823cc4771", + "metadata": {}, + "source": [ + "### Giving more resources to SPARK" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "678c8ea9-03b2-4c7e-9773-30d9d5443bb4", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Intitializing Scala interpreter ..." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "Spark Web UI available at http://d772c787a954:4041\n", + "SparkContext available as 'sc' (version = 3.5.1, master = local[*], app id = local-1734576327409)\n", + "SparkSession available as 'spark'\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "import org.apache.spark.sql.SparkSession\n", + "import org.apache.spark.sql.functions.col\n", + "import org.apache.spark.storage.StorageLevel\n", + "spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@73d0495\n" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.sql.SparkSession\n", + "import org.apache.spark.sql.functions.{col}\n", + "import org.apache.spark.storage.StorageLevel\n", + "\n", + "val spark = SparkSession.builder()\n", + " .appName(\"IcebergTableManagement\") \n", + " .config(\"spark.executor.memory\", \"32g\")\n", + " .config(\"spark.driver.memory\", \"32g\")\n", + " .config(\"spark.sql.shuffle.partitions\", \"200\") // Fine for large datasets\n", + " .config(\"spark.sql.files.maxPartitionBytes\", \"134217728\") // Optional: 128 MB is default\n", + " .config(\"spark.sql.autoBroadcastJoinThreshold\", \"-1\") // Optional: Disable broadcast join\n", + " .config(\"spark.dynamicAllocation.enabled\", \"true\") // Helps with resource allocation\n", + " .config(\"spark.dynamicAllocation.minExecutors\", \"1\") // Ensure minimum resources\n", + " .config(\"spark.dynamicAllocation.maxExecutors\", \"50\") // Scalable resource allocation\n", + " .getOrCreate()" + ] + }, + { + "cell_type": "markdown", + "id": "93497a33-06bd-46fd-b9bd-76dad95ec182", + "metadata": {}, + "source": [ + "### Loading Data" + ] + }, + { + "cell_type": "code", + "execution_count": 62, + "id": "596b9188-ab6d-4550-bfbe-748f69e5ef81", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "import org.apache.spark.sql.functions.{broadcast, split, lit}\n", + "matchesBucketed: org.apache.spark.sql.DataFrame = [match_id: string, mapid: string ... 8 more fields]\n", + "matchDetailsBucketed: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 34 more fields]\n" + ] + }, + "execution_count": 62, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "// In python use: from pyspark.sql.functions import broadcast, split, lit\n", + "import org.apache.spark.sql.functions.{broadcast, split, lit}\n", + "\n", + "\n", + "val matchesBucketed = spark.read.option(\"header\", \"true\")\n", + " .option(\"inferSchema\", \"true\")\n", + " .csv(\"/home/iceberg/data/matches.csv\")\n", + " \n", + "val matchDetailsBucketed = spark.read.option(\"header\", \"true\")\n", + " .option(\"inferSchema\", \"true\")\n", + " .csv(\"/home/iceberg/data/match_details.csv\")" + ] + }, + { + "cell_type": "code", + "execution_count": 63, + "id": "9eafc6a3-2427-436c-9952-504dcb565c98", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------------+------------+--------------------+-------------------+\n", + "| match_id|is_team_game| playlist_id| completion_date|\n", + "+--------------------+------------+--------------------+-------------------+\n", + "|11de1a94-8d07-416...| true|f72e0ef0-7c4a-430...|2016-02-22 00:00:00|\n", + "|d3643e71-3e51-43e...| false|d0766624-dbd7-453...|2016-02-14 00:00:00|\n", + "|d78d2aae-36e4-48a...| true|f72e0ef0-7c4a-430...|2016-03-24 00:00:00|\n", + "|b440069e-ec5f-4f5...| true|f72e0ef0-7c4a-430...|2015-12-23 00:00:00|\n", + "|1dd475fc-ee6b-4e1...| true|0e39ead4-383b-445...|2016-04-07 00:00:00|\n", + "+--------------------+------------+--------------------+-------------------+\n", + "only showing top 5 rows\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "res41: Long = 24025\n" + ] + }, + "execution_count": 63, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "matchesBucketed.select($\"match_id\", $\"is_team_game\", $\"playlist_id\", $\"completion_date\").show(5)\n", + "matchesBucketed.count()" + ] + }, + { + "cell_type": "code", + "execution_count": 64, + "id": "04af2b67-fce6-49fa-ab06-e6e489186fdd", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------------+---------------+------------------+-------------------+\n", + "| match_id|player_gamertag|player_total_kills|player_total_deaths|\n", + "+--------------------+---------------+------------------+-------------------+\n", + "|71d79b23-4143-435...| taterbase| 6| 13|\n", + "|71d79b23-4143-435...| SuPeRSaYaInG0D| 7| 18|\n", + "|71d79b23-4143-435...| EcZachly| 12| 10|\n", + "|71d79b23-4143-435...| johnsnake04| 13| 9|\n", + "|71d79b23-4143-435...| Super Mac Bros| 13| 15|\n", + "+--------------------+---------------+------------------+-------------------+\n", + "only showing top 5 rows\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "res42: Long = 151761\n" + ] + }, + "execution_count": 64, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "matchDetailsBucketed.select($\"match_id\", $\"player_gamertag\", $\"player_total_kills\", $\"player_total_deaths\").show(5)\n", + "matchDetailsBucketed.count()" + ] + }, + { + "cell_type": "code", + "execution_count": 65, + "id": "9b382e22-529e-4441-bfc7-949a00f9c5b7", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "res43: org.apache.spark.sql.DataFrame = [summary: string, match_id: string ... 3 more fields]\n" + ] + }, + "execution_count": 65, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "matchDetailsBucketed.select($\"match_id\", $\"player_gamertag\", $\"player_total_kills\", $\"player_total_deaths\").describe()" + ] + }, + { + "cell_type": "code", + "execution_count": 66, + "id": "94cbc8ef-ce3b-4eb9-a435-1dfe2602d789", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "matches: org.apache.spark.sql.DataFrame = [match_id: string, is_team_game: boolean ... 2 more fields]\n", + "matchDetails: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 2 more fields]\n" + ] + }, + "execution_count": 66, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "val matches = matchesBucketed.select($\"match_id\", $\"is_team_game\", $\"playlist_id\", $\"completion_date\")\n", + "val matchDetails = matchDetailsBucketed.select($\"match_id\", $\"player_gamertag\", $\"player_total_kills\", $\"player_total_deaths\")" + ] + }, + { + "cell_type": "code", + "execution_count": 67, + "id": "6ffe323c-8d5a-4365-bef7-36cf27522fb3", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------------+------------+--------------------+-------------------+\n", + "| match_id|is_team_game| playlist_id| completion_date|\n", + "+--------------------+------------+--------------------+-------------------+\n", + "|11de1a94-8d07-416...| true|f72e0ef0-7c4a-430...|2016-02-22 00:00:00|\n", + "|d3643e71-3e51-43e...| false|d0766624-dbd7-453...|2016-02-14 00:00:00|\n", + "|d78d2aae-36e4-48a...| true|f72e0ef0-7c4a-430...|2016-03-24 00:00:00|\n", + "|b440069e-ec5f-4f5...| true|f72e0ef0-7c4a-430...|2015-12-23 00:00:00|\n", + "|1dd475fc-ee6b-4e1...| true|0e39ead4-383b-445...|2016-04-07 00:00:00|\n", + "+--------------------+------------+--------------------+-------------------+\n", + "only showing top 5 rows\n", + "\n", + "+--------------------+---------------+------------------+-------------------+\n", + "| match_id|player_gamertag|player_total_kills|player_total_deaths|\n", + "+--------------------+---------------+------------------+-------------------+\n", + "|71d79b23-4143-435...| taterbase| 6| 13|\n", + "|71d79b23-4143-435...| SuPeRSaYaInG0D| 7| 18|\n", + "|71d79b23-4143-435...| EcZachly| 12| 10|\n", + "|71d79b23-4143-435...| johnsnake04| 13| 9|\n", + "|71d79b23-4143-435...| Super Mac Bros| 13| 15|\n", + "+--------------------+---------------+------------------+-------------------+\n", + "only showing top 5 rows\n", + "\n" + ] + } + ], + "source": [ + "matches.show(5)\n", + "matchDetails.show(5)" + ] + }, + { + "cell_type": "markdown", + "id": "9deedeef-8157-4a05-a6ce-36200c0b6954", + "metadata": {}, + "source": [ + "### Loading data from CSV -> Iceberg Table with buckets" + ] + }, + { + "cell_type": "markdown", + "id": "22e65974-82e0-434f-a0d3-50f42e952a7d", + "metadata": {}, + "source": [ + "**Table 1 - Matches**" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "cc0312ff-aab6-40ca-8bf4-b7a628fdc4e9", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "res2: org.apache.spark.sql.DataFrame = []\n" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark.sql(\"\"\"DROP TABLE IF EXISTS bootcamp.matches_bucketed;\"\"\")" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "da358566-1f8e-4e5d-b142-3de3d2219963", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "bucketedDDL: String =\n", + "\"\n", + "CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (\n", + " match_id STRING,\n", + " is_team_game BOOLEAN,\n", + " playlist_id STRING,\n", + " completion_date TIMESTAMP\n", + " )\n", + " USING iceberg\n", + " PARTITIONED BY (completion_date, bucket(16, match_id));\n", + " \"\n", + "res3: org.apache.spark.sql.DataFrame = []\n" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "val bucketedDDL = \"\"\"\n", + "CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (\n", + " match_id STRING,\n", + " is_team_game BOOLEAN,\n", + " playlist_id STRING,\n", + " completion_date TIMESTAMP\n", + " )\n", + " USING iceberg\n", + " PARTITIONED BY (completion_date, bucket(16, match_id));\n", + " \"\"\"\n", + "spark.sql(bucketedDDL)" + ] + }, + { + "cell_type": "markdown", + "id": "440f5148-8a30-4cef-afdc-5e1cfb3b4f14", + "metadata": {}, + "source": [ + "**Directly trying to save does not work!**" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "305013f6-dcab-4b77-9fe5-ec3e767aa0f1", + "metadata": {}, + "outputs": [], + "source": [ + "//matchesBucketed.select($\"match_id\", $\"is_team_game\", $\"playlist_id\", $\"completion_date\")\n", + "//.write.mode(\"append\")\n", + "//.partitionBy(\"completion_date\")\n", + "//.bucketBy(4, \"match_id\")\n", + "//.saveAsTable(\"bootcamp.matches_bucketed\")" + ] + }, + { + "cell_type": "markdown", + "id": "355243f4-f88f-4351-a6dc-50051f95c3cb", + "metadata": {}, + "source": [ + "**Processing in Batches**" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "5c387f1c-3fe2-4499-b412-f502539fb340", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "distinctDates: Array[org.apache.spark.sql.Row] = Array([2016-03-13 00:00:00.0], [2016-03-11 00:00:00.0], [2016-03-10 00:00:00.0], [2016-01-30 00:00:00.0], [2016-03-27 00:00:00.0], [2016-04-10 00:00:00.0], [2016-01-18 00:00:00.0], [2016-02-01 00:00:00.0], [2015-12-14 00:00:00.0], [2016-02-03 00:00:00.0], [2016-04-30 00:00:00.0], [2016-03-05 00:00:00.0], [2016-04-15 00:00:00.0], [2016-05-21 00:00:00.0], [2015-10-31 00:00:00.0], [2016-01-22 00:00:00.0], [2016-02-09 00:00:00.0], [2016-03-17 00:00:00.0], [2016-04-04 00:00:00.0], [2016-05-08 00:00:00.0], [2016-01-21 00:00:00.0], [2015-10-28 00:00:00.0], [2016-03-30 00:00:00.0], [2016-05-03 00:00:00.0], [2016-02-04 00:00:00.0], [2015-11-25 00:00:00.0], [2016-01-13 00:00:00.0], [2016-04-29 00:00:00.0], [2016-05-18 00:00:00.0], [2016-03-24 00:00...\n" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "// Get distinct completion dates\n", + "val distinctDates = matchesBucketed.select(\"completion_date\").distinct().collect()" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "577b60c2-d395-4483-92b4-24a36bcbc8ed", + "metadata": {}, + "outputs": [], + "source": [ + "// Process data in chunks based on completion_date\n", + "distinctDates.foreach { row =>\n", + " val date = row.getAs[java.sql.Timestamp](\"completion_date\")\n", + " val filteredMatches = matchesBucketed.filter(col(\"completion_date\") === date)\n", + " \n", + " // Repartition and persist the filtered data\n", + " val optimizedMatches = filteredMatches\n", + " .select($\"match_id\", $\"is_team_game\", $\"playlist_id\", $\"completion_date\")\n", + " .repartition(16, $\"match_id\")\n", + " .persist(StorageLevel.MEMORY_AND_DISK)\n", + " \n", + " optimizedMatches.write\n", + " .mode(\"append\")\n", + " .bucketBy(16, \"match_id\")\n", + " .partitionBy(\"completion_date\")\n", + " .saveAsTable(\"bootcamp.matches_bucketed\")\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "c15599ae-f84e-4f10-a1f8-5d0df99637e4", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------------+------------+--------------------+-------------------+\n", + "| match_id|is_team_game| playlist_id| completion_date|\n", + "+--------------------+------------+--------------------+-------------------+\n", + "|faf37c7f-3f3a-4f0...| true|b617e24f-71aa-432...|2016-05-16 00:00:00|\n", + "|cbb50ffc-714d-438...| false|d0766624-dbd7-453...|2016-09-26 00:00:00|\n", + "|d6aea3be-8d6f-408...| true|2323b76a-db98-4e0...|2016-08-13 00:00:00|\n", + "|9be0f082-b7fa-47f...| true|892189e9-d712-4bd...|2015-11-12 00:00:00|\n", + "|4a7fcf11-1d90-4c9...| true|2323b76a-db98-4e0...|2016-09-22 00:00:00|\n", + "+--------------------+------------+--------------------+-------------------+\n", + "only showing top 5 rows\n", + "\n" + ] + } + ], + "source": [ + "// Verify the data in the table\n", + "spark.sql(\"SELECT * FROM bootcamp.matches_bucketed\").show(5)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "beac45af-7755-4195-9cd7-59a1d4de5934", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---------+\n", + "|num_files|\n", + "+---------+\n", + "| 3665|\n", + "+---------+\n", + "\n" + ] + } + ], + "source": [ + "// Verify number of files\n", + "spark.sql(\"SELECT COUNT(1) as num_files FROM bootcamp.matches_bucketed.files\").show()" + ] + }, + { + "cell_type": "markdown", + "id": "0af254c8-8539-46f6-a94b-79820d2fbf70", + "metadata": {}, + "source": [ + "**Table 2 - Match Details**" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "0f0b5f60-965b-431d-93c8-d2714098552c", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "res8: org.apache.spark.sql.DataFrame = []\n" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark.sql(\"\"\"DROP TABLE IF EXISTS bootcamp.match_details_bucketed;\"\"\")" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "b279c388-ea54-4c97-a14e-736e5aebb57e", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "bucketedDetailsDDL: String =\n", + "\"\n", + "CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (\n", + " match_id STRING,\n", + " player_gamertag STRING,\n", + " player_total_kills INTEGER,\n", + " player_total_deaths INTEGER\n", + ")\n", + "USING iceberg\n", + "PARTITIONED BY (bucket(16, match_id));\n", + "\"\n", + "res9: org.apache.spark.sql.DataFrame = []\n" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "val bucketedDetailsDDL = \"\"\"\n", + "CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (\n", + " match_id STRING,\n", + " player_gamertag STRING,\n", + " player_total_kills INTEGER,\n", + " player_total_deaths INTEGER\n", + ")\n", + "USING iceberg\n", + "PARTITIONED BY (bucket(16, match_id));\n", + "\"\"\"\n", + "spark.sql(bucketedDetailsDDL)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 53, + "id": "cc72b083-df90-48a5-8b61-c951666b13e8", + "metadata": {}, + "outputs": [], + "source": [ + "//matchDetailsBucketed.select($\"match_id\", $\"player_gamertag\", $\"player_total_kills\", $\"player_total_deaths\")\n", + "//.write.mode(\"append\")\n", + "//.bucketBy(16, \"match_id\").saveAsTable(\"bootcamp.match_details_bucketed\")" + ] + }, + { + "cell_type": "code", + "execution_count": 45, + "id": "c017e9b7-1bc1-46d1-aa04-2e5ddc849155", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "someField: Array[org.apache.spark.sql.Row] = Array([31], [34], [28], [27], [26], [44], [12], [22], [47], [1], [13], [6], [16], [3], [20], [57], [48], [5], [19], [64], [41], [15], [37], [9], [17], [35], [4], [8], [23], [39], [7], [10], [50], [45], [38], [25], [24], [29], [21], [32], [11], [33], [14], [42], [2], [30], [46], [0], [18], [36], [52], [40], [94], [54], [43], [61], [59], [49], [51], [63], [82], [62], [60], [75], [109], [58], [83], [67], [69], [56], [71], [53], [76], [96], [55], [73], [90], [66], [65], [68])\n" + ] + }, + "execution_count": 45, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "val someField = matchDetails.select(\"player_total_kills\").distinct().collect()" + ] + }, + { + "cell_type": "code", + "execution_count": 52, + "id": "5f718dc1-4a4a-4b35-a23b-93af3d04de5c", + "metadata": {}, + "outputs": [], + "source": [ + "// Process data in chunks based on completion_date\n", + "someField.foreach { row =>\n", + " val fieldValue = row.getAs[Int](\"player_total_kills\") \n", + " val filteredData = matchDetailsBucketed.filter(col(\"player_total_kills\") === fieldValue)\n", + "\n", + " // Repartition and persist the filtered data\n", + " val optimizedData = filteredData\n", + " .select($\"match_id\", $\"player_gamertag\", $\"player_total_kills\", $\"player_total_deaths\")\n", + " .repartition(16, $\"match_id\")\n", + " .persist(StorageLevel.MEMORY_AND_DISK)\n", + " \n", + " optimizedData.write\n", + " .mode(\"append\")\n", + " .bucketBy(16, \"match_id\")\n", + " //.partitionBy(\"completion_date\")\n", + " .saveAsTable(\"bootcamp.match_details_bucketed\")\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": 54, + "id": "32c996c0-3151-466f-9d33-a57e896ee591", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "== Physical Plan ==\n", + "AdaptiveSparkPlan isFinalPlan=false\n", + "+- SortMergeJoin [match_id#48578], [match_id#48582], Inner\n", + " :- Sort [match_id#48578 ASC NULLS FIRST], false, 0\n", + " : +- Exchange hashpartitioning(match_id#48578, 200), ENSURE_REQUIREMENTS, [plan_id=21199]\n", + " : +- BatchScan demo.bootcamp.match_details_bucketed[match_id#48578, player_gamertag#48579, player_total_kills#48580, player_total_deaths#48581] demo.bootcamp.match_details_bucketed (branch=null) [filters=match_id IS NOT NULL, groupedBy=] RuntimeFilters: []\n", + " +- Sort [match_id#48582 ASC NULLS FIRST], false, 0\n", + " +- Exchange hashpartitioning(match_id#48582, 200), ENSURE_REQUIREMENTS, [plan_id=21200]\n", + " +- BatchScan demo.bootcamp.matches_bucketed[match_id#48582, is_team_game#48583, playlist_id#48584, completion_date#48585] demo.bootcamp.matches_bucketed (branch=null) [filters=completion_date IS NOT NULL, completion_date = 1451606400000000, match_id IS NOT NULL, groupedBy=] RuntimeFilters: []\n", + "\n", + "\n" + ] + } + ], + "source": [ + "// Shutting off Broadcast join to have Spark pick Bucketing \n", + "spark.conf.set(\"spark.sql.autoBroadcastJoinThreshold\", \"-1\")\n", + "\n", + "//matchesBucketed.createOrReplaceTempView(\"matches\")\n", + "//matchDetailsBucketed.createOrReplaceTempView(\"match_details\")\n", + "\n", + "spark.sql(\"\"\"\n", + " SELECT * FROM bootcamp.match_details_bucketed mdb JOIN bootcamp.matches_bucketed md \n", + " ON mdb.match_id = md.match_id\n", + " AND md.completion_date = DATE('2016-01-01') \n", + "\"\"\").explain()" + ] + }, + { + "cell_type": "code", + "execution_count": 59, + "id": "e596c02a-cda3-4a6e-9d84-5ef3b7e25a83", + "metadata": {}, + "outputs": [], + "source": [ + "//Not needed to save to table. we can join on csv files directly.\n", + "//matches.write.mode(\"overwrite\").saveAsTable(\"bootcamp.matches_raw\")\n", + "//matchDetails.write.mode(\"overwrite\").saveAsTable(\"bootcamp.match_details_raw\")" + ] + }, + { + "cell_type": "code", + "execution_count": 68, + "id": "e62b8eb1-91e0-42d0-b71b-11162b068a39", + "metadata": {}, + "outputs": [], + "source": [ + "matchesBucketed.createOrReplaceTempView(\"matches\")\n", + "matchDetailsBucketed.createOrReplaceTempView(\"match_details\")" + ] + }, + { + "cell_type": "code", + "execution_count": 70, + "id": "18df6c19-3aa1-48fd-9960-0fc2ac2b5084", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "== Physical Plan ==\n", + "AdaptiveSparkPlan isFinalPlan=false\n", + "+- SortMergeJoin [match_id#48790], [match_id#48753], Inner\n", + " :- Sort [match_id#48790 ASC NULLS FIRST], false, 0\n", + " : +- Exchange hashpartitioning(match_id#48790, 200), ENSURE_REQUIREMENTS, [plan_id=21459]\n", + " : +- Filter isnotnull(match_id#48790)\n", + " : +- FileScan csv [match_id#48790,player_gamertag#48791,previous_spartan_rank#48792,spartan_rank#48793,previous_total_xp#48794,total_xp#48795,previous_csr_tier#48796,previous_csr_designation#48797,previous_csr#48798,previous_csr_percent_to_next_tier#48799,previous_csr_rank#48800,current_csr_tier#48801,current_csr_designation#48802,current_csr#48803,current_csr_percent_to_next_tier#48804,current_csr_rank#48805,player_rank_on_team#48806,player_finished#48807,player_average_life#48808,player_total_kills#48809,player_total_headshots#48810,player_total_weapon_damage#48811,player_total_shots_landed#48812,player_total_melee_kills#48813,... 12 more fields] Batched: false, DataFilters: [isnotnull(match_id#48790)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/match_details.csv], PartitionFilters: [], PushedFilters: [IsNotNull(match_id)], ReadSchema: struct previous_quality_class THEN 1 - WHEN is_active <> previous_is_active THEN 1 - ELSE 0 - END AS change_indicator - FROM with_previous -), -with_streaks AS ( - SELECT * - , SUM(change_indicator) OVER (PARTITION BY actorid ORDER BY current_year) AS streak_identifier - FROM with_indicators -) -SELECT - actor - , actorid - , quality_class - , is_active - , MIN(current_year) AS start_year - , MAX(current_year) AS end_year - , 2020 AS current_year -FROM with_streaks -GROUP BY actor - , actorid - , quality_class - , is_active - , streak_identifier -ORDER BY actor - , streak_identifier - -""" - - -def do_actor_scd_transformation(spark, dataframe): - dataframe.createOrReplaceTempView("actors") - return spark.sql(query) - -def main(): - spark = SparkSession.builder \ - .master("local") \ - .appName("actors_scd") \ - .getOrCreate() - output_df = do_actor_scd_transformation(spark, spark.table("actors")) +from pyspark.sql import SparkSession + +query = """ + +WITH with_previous AS ( + SELECT actor + , actorid + , current_year + , quality_class + , is_active + , LAG(quality_class, 1) OVER (PARTITION BY actorid ORDER BY current_year) AS previous_quality_class + , LAG(is_active, 1) OVER (PARTITION BY actorid ORDER BY current_year) AS previous_is_active + FROM actors + WHERE current_year <= 2021 +), +with_indicators AS ( + SELECT * + , CASE + WHEN quality_class <> previous_quality_class THEN 1 + WHEN is_active <> previous_is_active THEN 1 + ELSE 0 + END AS change_indicator + FROM with_previous +), +with_streaks AS ( + SELECT * + , SUM(change_indicator) OVER (PARTITION BY actorid ORDER BY current_year) AS streak_identifier + FROM with_indicators +) +SELECT + actor + , actorid + , quality_class + , is_active + , MIN(current_year) AS start_year + , MAX(current_year) AS end_year + , cast(2021 as BIGINT) AS current_year +FROM with_streaks +GROUP BY actor + , actorid + , quality_class + , is_active + , streak_identifier +ORDER BY actor + , streak_identifier + +""" + + +def do_actor_scd_transformation(spark, dataframe): + dataframe.createOrReplaceTempView("actors") + return spark.sql(query) + +def main(): + spark = SparkSession.builder \ + .master("local") \ + .appName("actors_scd") \ + .getOrCreate() + output_df = do_actor_scd_transformation(spark, spark.table("actors")) output_df.write.mode("overwrite").insertInto("actors_scd") \ No newline at end of file diff --git a/bootcamp/materials/3-spark-fundamentals/src/tests/test_actors_scd.py b/bootcamp/materials/3-spark-fundamentals/src/tests/test_actors_scd.py index 4b801fbd..7b83b3b5 100644 --- a/bootcamp/materials/3-spark-fundamentals/src/tests/test_actors_scd.py +++ b/bootcamp/materials/3-spark-fundamentals/src/tests/test_actors_scd.py @@ -1,27 +1,27 @@ -from chispa.dataframe_comparer import * -from ..jobs.actors_scd_job import do_actor_scd_transformation -from collections import namedtuple - -ActorYear = namedtuple("ActorYear", "actor current_year quality_class") -ActorScd = namedtuple("ActorScd", "actor quality_class start_year end_year") - - -def test_scd_generation(spark): - source_data = [ - ActorYear("Meat Loaf", 2018, 'Good'), - ActorYear("Meat Loaf", 2019, 'Good'), - ActorYear("Meat Loaf", 2020, 'Bad'), - ActorYear("Meat Loaf", 2021, 'Bad'), - ActorYear("Skid Markel", 2020, 'Bad'), - ActorYear("Skid Markel", 2021, 'Bad') - ] - source_df = spark.createDataFrame(source_data) - - actual_df = do_actor_scd_transformation(spark, source_df) - expected_data = [ - ActorScd("Meat Loaf", 'Good', 2018, 2019), - ActorScd("Meat Loaf", 'Bad', 2020, 2021), - ActorScd("Skid Markel", 'Bad', 2020, 2021) - ] - expected_df = spark.createDataFrame(expected_data) - assert_df_equality(actual_df, expected_df) \ No newline at end of file +from chispa.dataframe_comparer import * +from ..jobs.actors_scd_job import do_actor_scd_transformation +from collections import namedtuple + +ActorYear = namedtuple("ActorYear", "actor actorid current_year quality_class is_active") +ActorScd = namedtuple("ActorScd", "actor actorid quality_class is_active start_year end_year current_year") + + +def test_scd_generation(spark): + source_data = [ + ActorYear("Meat Loaf", 123, 2018, 'Good', 1), + ActorYear("Meat Loaf", 123, 2019, 'Good', 1), + ActorYear("Meat Loaf", 123, 2020, 'Bad', 1), + ActorYear("Meat Loaf", 123, 2021, 'Bad', 1), + ActorYear("Skid Markel", 321, 2020, 'Bad',1), + ActorYear("Skid Markel", 321, 2021, 'Bad',1) + ] + source_df = spark.createDataFrame(source_data) + + actual_df = do_actor_scd_transformation(spark, source_df) + expected_data = [ + ActorScd("Meat Loaf", 123, 'Good', 1, 2018, 2019,2021), + ActorScd("Meat Loaf", 123, 'Bad', 1, 2020, 2021,2021), + ActorScd("Skid Markel", 321, 'Bad', 1, 2020, 2021,2021) + ] + expected_df = spark.createDataFrame(expected_data) + assert_df_equality(actual_df, expected_df, ignore_nullable=True) \ No newline at end of file