Skip to content

Commit ffade8e

Browse files
committed
docs: Updated notebook for streaming
1 parent 400c041 commit ffade8e

File tree

1 file changed

+97
-41
lines changed

1 file changed

+97
-41
lines changed

redis-spark-notebook.ipynb

Lines changed: 97 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
"1. Choose **Maven** as your source and click **Search Packages**\n",
3030
"1. Enter `redis-spark-connector` and select `com.redis:redis-spark-connector:x.y.z`\n",
3131
"1. Finalize by clicking **Install** <br/>\n",
32-
"Want to explore the connector's full capabilities? Check the [detailed documentation](https://redis-field-engineering.github.io/redis-spark)\n"
32+
"Want to explore the connector's full capabilities? Check the [detailed documentation](https://redis-field-engineering.github.io/redis-spark)"
3333
]
3434
},
3535
{
@@ -57,9 +57,7 @@
5757
"## Configuring Spark with Redis Connection Details\n",
5858
"\n",
5959
"1. From your Redis Cloud database dashboard, find your connection endpoint under **Connect**. The string follows this pattern: `redis://<user>:<pass>@<host>:<port>`\n",
60-
"1. In Databricks, open your cluster settings and locate **Advanced Options**. Under **Spark** in the **Spark config** text area, add your Redis connection string as both `spark.redis.read.connection.uri redis://...` and `spark.redis.write.connection.uri redis://...` parameters. This configuration applies to all notebooks using this cluster. Note that it is recommended to use secrets to store sensitive Redis URIs. Refer to the [Redis Spark documentation](https://redis-field-engineering.github.io/redis-spark/#_databricks) for more details.\n",
61-
"\n",
62-
"\n"
60+
"1. In Databricks, open your cluster settings and locate **Advanced Options**. Under **Spark** in the **Spark config** text area, add your Redis connection string as both `spark.redis.read.connection.uri redis://...` and `spark.redis.write.connection.uri redis://...` parameters. This configuration applies to all notebooks using this cluster. Note that it is recommended to use secrets to store sensitive Redis URIs. Refer to the [Redis Spark documentation](https://redis-field-engineering.github.io/redis-spark/#_databricks) for more details."
6361
]
6462
},
6563
{
@@ -77,7 +75,7 @@
7775
"source": [
7876
"## Reading from Redis\n",
7977
"\n",
80-
"To read data from Redis run the following code:\n"
78+
"To read data from Redis use the following line."
8179
]
8280
},
8381
{
@@ -102,6 +100,45 @@
102100
"display(df)"
103101
]
104102
},
103+
{
104+
"cell_type": "markdown",
105+
"metadata": {
106+
"application/vnd.databricks.v1+cell": {
107+
"cellMetadata": {},
108+
"inputWidgets": {},
109+
"nuid": "63658b45-2684-440f-9d2d-b0d84bb4af8f",
110+
"showTitle": false,
111+
"tableResultSettingsMap": {},
112+
"title": ""
113+
}
114+
},
115+
"source": [
116+
"## Writing to Redis\n",
117+
"\n",
118+
"Let's use the `df` data we imported earlier and write it back to Redis as JSON. Refresh **Redis Insight** and notice the new JSON keys prefixed with `spark:nobel`."
119+
]
120+
},
121+
{
122+
"cell_type": "code",
123+
"execution_count": 0,
124+
"metadata": {
125+
"application/vnd.databricks.v1+cell": {
126+
"cellMetadata": {
127+
"byteLimit": 2048000,
128+
"rowLimit": 10000
129+
},
130+
"inputWidgets": {},
131+
"nuid": "37786ad3-b0ec-49a9-8839-75ff907e4cae",
132+
"showTitle": false,
133+
"tableResultSettingsMap": {},
134+
"title": ""
135+
}
136+
},
137+
"outputs": [],
138+
"source": [
139+
"df.write.format(\"redis\").option(\"type\", \"json\").option(\"keyspace\", \"spark:nobel\").option(\"key\", \"id\").mode(\"append\").save()"
140+
]
141+
},
105142
{
106143
"cell_type": "markdown",
107144
"metadata": {
@@ -117,7 +154,7 @@
117154
"source": [
118155
"## Reading from Redis in Streaming Mode\n",
119156
"\n",
120-
"The following code reads data from a Redis stream, appending data to a streaming in-memory dataframe."
157+
"The following code reads data from the Redis stream `nobels`, appending data to a streaming in-memory dataframe."
121158
]
122159
},
123160
{
@@ -138,31 +175,31 @@
138175
},
139176
"outputs": [],
140177
"source": [
178+
"streamDf = spark.readStream.format(\"redis\").option(\"type\", \"stream\").option(\"streamKey\", \"nobels\").load()\n",
179+
"query = streamDf.writeStream.format(\"memory\").queryName(\"nobels\").outputMode(\"append\").trigger(processingTime=\"1 second\").start()\n",
180+
"\n",
141181
"import time\n",
142-
"streamDf = spark.readStream.format(\"redis\").option(\"type\", \"stream\").option(\"streamKey\", \"stream:nobels\").load()\n",
143-
"query = streamDf.writeStream.format(\"memory\").trigger(continuous=\"1 second\").queryName(\"nobels\").start()\n",
144182
"time.sleep(3)\n",
145-
"streamDs = spark.sql(\"select * from nobels\")\n",
146-
"display(streamDs)"
183+
"display(spark.sql(\"SELECT * FROM nobels\"))"
147184
]
148185
},
149186
{
150187
"cell_type": "markdown",
151188
"metadata": {
152189
"application/vnd.databricks.v1+cell": {
153-
"cellMetadata": {},
190+
"cellMetadata": {
191+
"byteLimit": 2048000,
192+
"rowLimit": 10000
193+
},
154194
"inputWidgets": {},
155-
"nuid": "63658b45-2684-440f-9d2d-b0d84bb4af8f",
195+
"nuid": "db0205c2-3764-42c7-b90b-008afa288a89",
156196
"showTitle": false,
157197
"tableResultSettingsMap": {},
158198
"title": ""
159199
}
160200
},
161201
"source": [
162-
"## Writing to Redis\n",
163-
"\n",
164-
"1. Let's use the `df` data we imported earlier and write it back to Redis as JSON.\n",
165-
"1. Refresh **Redis Insight** and notice the new JSON keys prefixed with `spark:write`."
202+
"With the previously created streaming dataframe still running we can add data to the stream and see the dataframe receiving that new data. In **Redis Insight** change the \"All Key Types\" filter to only show keys of **Stream** type. Double-click on the `nobels` stream and click `New Entry`. Add the following fields: `category`: `physics`, `id`: `123`, `share`: `1`, `year`: `2025`, `firstName`, `lastName`, and `motivation`. Hit `Save` and run the query again. You should now see your entry in at the bottom of the table:"
166203
]
167204
},
168205
{
@@ -175,22 +212,25 @@
175212
"rowLimit": 10000
176213
},
177214
"inputWidgets": {},
178-
"nuid": "37786ad3-b0ec-49a9-8839-75ff907e4cae",
215+
"nuid": "551e6a88-a9f8-48ec-82b7-34f7f8326569",
179216
"showTitle": false,
180217
"tableResultSettingsMap": {},
181218
"title": ""
182219
}
183220
},
184221
"outputs": [],
185222
"source": [
186-
"df.write.format(\"redis\").option(\"type\", \"json\").option(\"keyspace\", \"spark:write:nobel\").option(\"key\", \"id\").mode(\"append\").save()"
223+
"display(spark.sql(\"SELECT * FROM nobels\"))"
187224
]
188225
},
189226
{
190227
"cell_type": "markdown",
191228
"metadata": {
192229
"application/vnd.databricks.v1+cell": {
193-
"cellMetadata": {},
230+
"cellMetadata": {
231+
"byteLimit": 2048000,
232+
"rowLimit": 10000
233+
},
194234
"inputWidgets": {},
195235
"nuid": "f6ba51ee-705b-4e92-89e6-8530978ba987",
196236
"showTitle": false,
@@ -202,8 +242,7 @@
202242
"## Writing to Redis in Streaming Mode\n",
203243
"\n",
204244
"We can also write to Redis in streaming mode.\n",
205-
"1. Replace `<catalog>`, `<schema>`, and `<volume>` with names for a Unity Catalog volume and run the code below.\n",
206-
"2. In **Redis Insight** refresh the database and notice the new hash keys prefixed with `spark:writeStream`\n"
245+
"1. Replace `<catalog>`, `<schema>`, and `<volume>` with names for a Unity Catalog volume."
207246
]
208247
},
209248
{
@@ -222,44 +261,61 @@
222261
"title": ""
223262
}
224263
},
225-
"outputs": [
226-
{
227-
"data": {
228-
"text/plain": [
229-
"<pyspark.sql.streaming.query.StreamingQuery at 0x7fd64dc7a4e0>"
230-
]
231-
},
232-
"execution_count": 78,
233-
"metadata": {},
234-
"output_type": "execute_result"
235-
}
236-
],
264+
"outputs": [],
237265
"source": [
238266
"catalog = \"<catalog>\"\n",
239267
"schema = \"<schema>\"\n",
240-
"volume = \"<volume>\"\n",
268+
"volume = \"<tutorial>\"\n",
241269
"\n",
242270
"path_volume = f\"/Volumes/{catalog}/{schema}/{volume}\"\n",
243-
"dbutils.fs.cp(\"http://storage.googleapis.com/jrx/nobels.jsonl\", f\"{path_volume}/nobels.json\")\n",
244-
"\n",
245-
"checkpoint_dir = f\"{path_volume}/checkpoint\"\n",
271+
"checkpoint_dir = f\"{path_volume}/mycp\"\n",
246272
"dbutils.fs.mkdirs(checkpoint_dir)\n",
247273
"\n",
248-
"spark.readStream.schema(\"id INT, year INT, category STRING, share INT, firstName STRING, lastName STRING, motivation STRING\").json(f\"{path_volume}/*.json\") \\\n",
249-
" .writeStream.format(\"redis\").outputMode(\"append\") \\\n",
274+
"streamDf.writeStream.format(\"redis\").outputMode(\"append\") \\\n",
250275
" .option(\"type\", \"hash\") \\\n",
251-
" .option(\"keyspace\", \"spark:writeStream:nobel\") \\\n",
276+
" .option(\"keyspace\", \"spark:nobel\") \\\n",
252277
" .option(\"key\", \"id\") \\\n",
253278
" .option(\"checkpointLocation\", checkpoint_dir) \\\n",
254279
" .start()\n"
255280
]
281+
},
282+
{
283+
"cell_type": "markdown",
284+
"metadata": {
285+
"application/vnd.databricks.v1+cell": {
286+
"cellMetadata": {},
287+
"inputWidgets": {},
288+
"nuid": "b494d5a2-3187-4f8a-8632-33fee60d6492",
289+
"showTitle": false,
290+
"tableResultSettingsMap": {},
291+
"title": ""
292+
}
293+
},
294+
"source": [
295+
"In **Redis Insight** select keys with the pattern `spark:nobel:*`. You should see hashes corresponding to the entries in the `nobels` that we used previously. If you add other entries to the stream like we did in the *Reading from Redis in Streaming Mode* section, you will them reflected in that `spark:nobel` keyspace."
296+
]
297+
},
298+
{
299+
"cell_type": "markdown",
300+
"metadata": {
301+
"application/vnd.databricks.v1+cell": {
302+
"cellMetadata": {},
303+
"inputWidgets": {},
304+
"nuid": "49fa26c8-cd66-49f8-a5d4-b0c29098ffc6",
305+
"showTitle": false,
306+
"tableResultSettingsMap": {},
307+
"title": ""
308+
}
309+
},
310+
"source": []
256311
}
257312
],
258313
"metadata": {
259314
"application/vnd.databricks.v1+notebook": {
260315
"computePreferences": null,
261316
"dashboards": [],
262317
"environmentMetadata": null,
318+
"inputWidgetPreferences": null,
263319
"language": "python",
264320
"notebookMetadata": {
265321
"pythonIndentUnit": 4
@@ -273,4 +329,4 @@
273329
},
274330
"nbformat": 4,
275331
"nbformat_minor": 0
276-
}
332+
}

0 commit comments

Comments
 (0)