diff --git a/examples/notion/.env.example b/examples/notion/.env.example new file mode 100644 index 00000000..95806911 --- /dev/null +++ b/examples/notion/.env.example @@ -0,0 +1,7 @@ +# Database Configuration +COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex + +# Notion Configuration +NOTION_TOKEN=secret_your_notion_integration_token_here +NOTION_DATABASE_IDS=database_id_1,database_id_2 +NOTION_PAGE_IDS=page_id_1,page_id_2 diff --git a/examples/notion/README.md b/examples/notion/README.md new file mode 100644 index 00000000..8e514082 --- /dev/null +++ b/examples/notion/README.md @@ -0,0 +1,69 @@ +This example builds an embedding index based on Notion databases and pages. +It continuously updates the index as content is added / updated / deleted in Notion: +it keeps the index in sync with your Notion workspace effortlessly. + +## Prerequisite + +Before running the example, you need to: + +1. [Install Postgres](https://cocoindex.io/docs/getting_started/installation#-install-postgres) if you don't have one. + +2. Prepare for Notion integration. + - Create a Notion integration at https://www.notion.so/my-integrations + - Copy the integration token (starts with `secret_`) + - Share your databases and pages with the integration + +3. Create a `.env` file with your Notion token and database/page IDs. + Start from copying the `.env.example`, and then edit it to fill in your configuration. + + ```bash + cp .env.example .env + $EDITOR .env + ``` + + Example `.env` file: + ``` + # Database Configuration + COCOINDEX_DATABASE_URL=postgresql://localhost:5432/cocoindex + + # Notion Configuration + NOTION_TOKEN=secret_your_notion_integration_token_here + NOTION_DATABASE_IDS=database_id_1,database_id_2 + NOTION_PAGE_IDS=page_id_1,page_id_2 + ``` + + Note: You can specify either database IDs, page IDs, or both. The system will index all specified resources. + +## Run + +Install dependencies: + +```sh +pip install -e . +``` + +Run: + +```sh +python main.py +``` + +During running, it will keep observing changes in your Notion workspace and update the index automatically. +At the same time, it accepts queries from the terminal, and performs search on top of the up-to-date index. + +## CocoInsight +CocoInsight is in Early Access now (Free) 😊 You found us! A quick 3 minute video tutorial about CocoInsight: [Watch on YouTube](https://youtu.be/ZnmyoHslBSc?si=pPLXWALztkA710r9). + +Run CocoInsight to understand your RAG data pipeline: + +```sh +cocoindex server -ci main.py +``` + +You can also add a `-L` flag to make the server keep updating the index to reflect source changes at the same time: + +```sh +cocoindex server -ci -L main.py +``` + +Then open the CocoInsight UI at [https://cocoindex.io/cocoinsight](https://cocoindex.io/cocoinsight). diff --git a/examples/notion/main.py b/examples/notion/main.py new file mode 100644 index 00000000..b70e6247 --- /dev/null +++ b/examples/notion/main.py @@ -0,0 +1,160 @@ +from dotenv import load_dotenv +from psycopg_pool import ConnectionPool +import cocoindex +import os +from typing import Any + + +@cocoindex.transform_flow() +def text_to_embedding( + text: cocoindex.DataSlice[str], +) -> cocoindex.DataSlice[list[float]]: + """ + Embed the text using a SentenceTransformer model. + This is a shared logic between indexing and querying, so extract it as a function. + """ + return text.transform( + cocoindex.functions.SentenceTransformerEmbed( + model="sentence-transformers/all-MiniLM-L6-v2" + ) + ) + + +@cocoindex.flow_def(name="NotionTextEmbedding") +def notion_text_embedding_flow( + flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope +) -> None: + """ + Define an example flow that embeds text from Notion databases and pages into a vector database. + """ + notion_token = os.environ["NOTION_TOKEN"] + + # Add Notion source + database_ids = ( + os.environ.get("NOTION_DATABASE_IDS", "").split(",") + if os.environ.get("NOTION_DATABASE_IDS") + else [] + ) + page_ids = ( + os.environ.get("NOTION_PAGE_IDS", "").split(",") + if os.environ.get("NOTION_PAGE_IDS") + else [] + ) + + # For now, let's use only one type at a time to avoid conflicts + if database_ids: + data_scope["notion_content"] = flow_builder.add_source( + cocoindex.sources.Notion( + token=notion_token, + source_type="database", + database_ids=database_ids, + ) + ) + elif page_ids: + data_scope["notion_content"] = flow_builder.add_source( + cocoindex.sources.Notion( + token=notion_token, + source_type="page", + page_ids=page_ids, + ) + ) + else: + # If no IDs provided, create a dummy source that won't produce any data + data_scope["notion_content"] = flow_builder.add_source( + cocoindex.sources.Notion( + token=notion_token, + source_type="page", + page_ids=[], + ) + ) + + doc_embeddings = data_scope.add_collector() + + # Process Notion content + with data_scope["notion_content"].row() as notion_entry: + print(f"""DEBUG: Processing notion entry content {notion_entry["content"]}""") + + notion_entry["chunks"] = notion_entry["content"].transform( + cocoindex.functions.SplitRecursively(), + language="markdown", + chunk_size=200, + chunk_overlap=0, + ) + + with notion_entry["chunks"].row() as chunk: + print("row") + chunk["embedding"] = text_to_embedding(chunk["text"]) + doc_embeddings.collect( + notion_id=notion_entry["id"], + title=notion_entry["title"], + location=chunk["location"], + text=chunk["text"], + embedding=chunk["embedding"], + ) + + doc_embeddings.export( + "doc_embeddings", + cocoindex.targets.Postgres(), + primary_key_fields=["notion_id", "location"], + vector_indexes=[ + cocoindex.VectorIndexDef( + field_name="embedding", + metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY, + ) + ], + ) + + +def search(pool: ConnectionPool, query: str, top_k: int = 5) -> list[dict[str, Any]]: + # Get the table name, for the export target in the notion_text_embedding_flow above. + table_name = cocoindex.utils.get_target_default_name( + notion_text_embedding_flow, "doc_embeddings" + ) + # Evaluate the transform flow defined above with the input query, to get the embedding. + query_vector = text_to_embedding.eval(query) + # Run the query and get the results. + with pool.connection() as conn: + with conn.cursor() as cur: + cur.execute( + f""" + SELECT title, text, embedding <=> %s::vector AS distance + FROM {table_name} ORDER BY distance LIMIT %s + """, + (query_vector, top_k), + ) + return [ + { + "title": row[0], + "text": row[1], + "score": 1.0 - row[2], + } + for row in cur.fetchall() + ] + + +def _main() -> None: + # Initialize the database connection pool. + pool = ConnectionPool(os.getenv("COCOINDEX_DATABASE_URL")) + + notion_text_embedding_flow.setup() + # with cocoindex.FlowLiveUpdater(notion_text_embedding_flow): + if 1: + # Run queries in a loop to demonstrate the query capabilities. + while True: + query = input("Enter search query (or Enter to quit): ") + if query == "": + break + # Run the query function with the database connection pool and the query. + results = search(pool, query) + print("\nSearch results:") + for result in results: + print(f"[{result['score']:.3f}] {result['title']}") + print(f" {result['text']}") + print("---") + print() + + +if __name__ == "__main__": + load_dotenv() + cocoindex.init() + _main() diff --git a/examples/notion/pyproject.toml b/examples/notion/pyproject.toml new file mode 100644 index 00000000..4f63300a --- /dev/null +++ b/examples/notion/pyproject.toml @@ -0,0 +1,9 @@ +[project] +name = "notion-text-embedding" +version = "0.1.0" +description = "Simple example for cocoindex: build embedding index based on Notion databases and pages." +requires-python = ">=3.11" +dependencies = ["cocoindex[embeddings]>=0.1.63", "python-dotenv>=1.0.1"] + +[tool.setuptools] +packages = [] diff --git a/python/cocoindex/sources.py b/python/cocoindex/sources.py index d18f9934..2bbfe457 100644 --- a/python/cocoindex/sources.py +++ b/python/cocoindex/sources.py @@ -43,3 +43,14 @@ class AmazonS3(op.SourceSpec): included_patterns: list[str] | None = None excluded_patterns: list[str] | None = None sqs_queue_url: str | None = None + + +class Notion(op.SourceSpec): + """Import data from Notion databases and pages.""" + + _op_category = op.OpCategory.SOURCE + + token: str + source_type: str # "database" or "page" + database_ids: list[str] | None = None + page_ids: list[str] | None = None diff --git a/src/ops/registration.rs b/src/ops/registration.rs index a7710b9c..1d52bcee 100644 --- a/src/ops/registration.rs +++ b/src/ops/registration.rs @@ -11,6 +11,7 @@ fn register_executor_factories(registry: &mut ExecutorFactoryRegistry) -> Result sources::local_file::Factory.register(registry)?; sources::google_drive::Factory.register(registry)?; sources::amazon_s3::Factory.register(registry)?; + sources::notion::Factory.register(registry)?; functions::parse_json::Factory.register(registry)?; functions::split_recursively::register(registry)?; diff --git a/src/ops/sources/mod.rs b/src/ops/sources/mod.rs index 557f44f7..faadb74a 100644 --- a/src/ops/sources/mod.rs +++ b/src/ops/sources/mod.rs @@ -1,3 +1,4 @@ pub mod amazon_s3; pub mod google_drive; pub mod local_file; +pub mod notion; diff --git a/src/ops/sources/notion.rs b/src/ops/sources/notion.rs new file mode 100644 index 00000000..7d5ec988 --- /dev/null +++ b/src/ops/sources/notion.rs @@ -0,0 +1,502 @@ +use crate::fields_value; +use async_stream::try_stream; +use reqwest::Client; +use serde::Deserialize; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use tokio::sync::Mutex; + +use crate::base::field_attrs; +use crate::base::value::FieldValues; +use crate::ops::sdk::*; + +#[derive(Debug, Deserialize)] +pub struct Spec { + token: String, + source_type: String, // "database" or "page" + database_ids: Option>, + page_ids: Option>, +} + +#[derive(Debug, Deserialize)] +struct NotionPage { + id: String, + created_time: String, + last_edited_time: String, + properties: HashMap, + #[serde(default)] + url: Option, +} + +#[derive(Debug, Deserialize)] +struct NotionProperty { + #[serde(rename = "type")] + property_type: String, + title: Option>, + rich_text: Option>, +} + +#[derive(Debug, Deserialize)] +struct NotionRichText { + plain_text: String, +} + +#[derive(Debug, Deserialize)] +struct NotionDatabaseQueryResponse { + results: Vec, + has_more: bool, + next_cursor: Option, +} + +#[derive(Debug, Deserialize)] +struct NotionPageResponse { + id: String, + created_time: String, + last_edited_time: String, + properties: HashMap, + #[serde(default)] + url: Option, +} + +#[derive(Debug, Deserialize)] +struct NotionBlocksResponse { + results: Vec, + has_more: bool, + next_cursor: Option, +} + +#[derive(Debug, Deserialize)] +struct NotionBlock { + id: String, + #[serde(rename = "type")] + block_type: String, + paragraph: Option, + heading_1: Option, + heading_2: Option, + heading_3: Option, + bulleted_list_item: Option, + numbered_list_item: Option, + code: Option, +} + +#[derive(Debug, Deserialize)] +struct NotionParagraph { + rich_text: Vec, +} + +#[derive(Debug, Deserialize)] +struct NotionHeading { + rich_text: Vec, +} + +#[derive(Debug, Deserialize)] +struct NotionListItem { + rich_text: Vec, +} + +#[derive(Debug, Deserialize)] +struct NotionCode { + rich_text: Vec, +} + +struct Executor { + client: Client, + token: String, + source_type: String, + database_ids: Vec, + page_ids: Vec, + // Cache to prevent concurrent processing of the same page + processing_cache: Arc>>>>, +} + +impl Executor { + fn new(spec: Spec) -> Self { + let client = Client::new(); + let database_ids = spec.database_ids.unwrap_or_default(); + let page_ids = spec.page_ids.unwrap_or_default(); + + Self { + client, + token: spec.token, + source_type: spec.source_type, + database_ids, + page_ids, + processing_cache: Arc::new(std::sync::Mutex::new(HashMap::new())), + } + } + + async fn fetch_database_pages(&self, database_id: &str) -> Result> { + let mut all_pages = Vec::new(); + let mut cursor = None; + let mut seen_ids = std::collections::HashSet::new(); + + loop { + let url = format!("https://api.notion.com/v1/databases/{}/query", database_id); + + let mut body = serde_json::json!({ + "page_size": 100 + }); + + if let Some(cursor) = cursor { + body["start_cursor"] = serde_json::Value::String(cursor); + } + + let response = self + .client + .post(&url) + .header("Authorization", format!("Bearer {}", self.token)) + .header("Notion-Version", "2022-06-28") + .header("Content-Type", "application/json") + .json(&body) + .send() + .await?; + + if !response.status().is_success() { + return Err(anyhow::anyhow!("Notion API error: {}", response.status())); + } + + let query_response: NotionDatabaseQueryResponse = response.json().await?; + + // Filter out duplicates + for page in query_response.results { + if seen_ids.insert(page.id.clone()) { + all_pages.push(page); + } + } + + if !query_response.has_more { + break; + } + cursor = query_response.next_cursor; + } + + Ok(all_pages) + } + + async fn fetch_page(&self, page_id: &str) -> Result { + let url = format!("https://api.notion.com/v1/pages/{}", page_id); + + let response = self + .client + .get(&url) + .header("Authorization", format!("Bearer {}", self.token)) + .header("Notion-Version", "2022-06-28") + .send() + .await?; + + if !response.status().is_success() { + return Err(anyhow::anyhow!("Notion API error: {}", response.status())); + } + + let page: NotionPageResponse = response.json().await?; + Ok(NotionPage { + id: page.id, + created_time: page.created_time, + last_edited_time: page.last_edited_time, + properties: page.properties, + url: page.url, + }) + } + + async fn fetch_page_content(&self, page_id: &str) -> Result { + let mut content = String::new(); + let mut cursor = None; + + loop { + let mut url = format!("https://api.notion.com/v1/blocks/{}/children", page_id); + if let Some(cursor) = &cursor { + url.push_str(&format!("?start_cursor={}", cursor)); + } + + let response = self + .client + .get(&url) + .header("Authorization", format!("Bearer {}", self.token)) + .header("Notion-Version", "2022-06-28") + .send() + .await?; + + if !response.status().is_success() { + return Err(anyhow::anyhow!("Notion API error: {}", response.status())); + } + + let blocks_response: NotionBlocksResponse = response.json().await?; + + for block in blocks_response.results { + match block.block_type.as_str() { + "paragraph" => { + if let Some(paragraph) = block.paragraph { + let text = paragraph + .rich_text + .iter() + .map(|rt| rt.plain_text.as_str()) + .collect::>() + .join(""); + content.push_str(&text); + content.push('\n'); + } + } + "heading_1" => { + if let Some(heading) = block.heading_1 { + let text = heading + .rich_text + .iter() + .map(|rt| rt.plain_text.as_str()) + .collect::>() + .join(""); + content.push_str(&format!("# {}\n", text)); + } + } + "heading_2" => { + if let Some(heading) = block.heading_2 { + let text = heading + .rich_text + .iter() + .map(|rt| rt.plain_text.as_str()) + .collect::>() + .join(""); + content.push_str(&format!("## {}\n", text)); + } + } + "heading_3" => { + if let Some(heading) = block.heading_3 { + let text = heading + .rich_text + .iter() + .map(|rt| rt.plain_text.as_str()) + .collect::>() + .join(""); + content.push_str(&format!("### {}\n", text)); + } + } + "bulleted_list_item" => { + if let Some(list_item) = block.bulleted_list_item { + let text = list_item + .rich_text + .iter() + .map(|rt| rt.plain_text.as_str()) + .collect::>() + .join(""); + content.push_str(&format!("- {}\n", text)); + } + } + "numbered_list_item" => { + if let Some(list_item) = block.numbered_list_item { + let text = list_item + .rich_text + .iter() + .map(|rt| rt.plain_text.as_str()) + .collect::>() + .join(""); + content.push_str(&format!("1. {}\n", text)); + } + } + "code" => { + if let Some(code) = block.code { + let text = code + .rich_text + .iter() + .map(|rt| rt.plain_text.as_str()) + .collect::>() + .join(""); + content.push_str(&format!("```\n{}\n```\n", text)); + } + } + _ => {} + } + } + + if !blocks_response.has_more { + break; + } + cursor = blocks_response.next_cursor; + } + + Ok(content) + } + + fn extract_title_from_properties( + &self, + properties: &HashMap, + ) -> String { + for (_, property) in properties { + if let Some(title) = &property.title { + if !title.is_empty() { + return title + .iter() + .map(|rt| rt.plain_text.as_str()) + .collect::>() + .join(""); + } + } + } + "Untitled".to_string() + } + + fn parse_datetime(&self, datetime_str: &str) -> Result { + let dt = chrono::DateTime::parse_from_rfc3339(datetime_str)?; + Ok(Ordinal(Some(dt.timestamp_micros()))) + } +} + +#[async_trait] +impl SourceExecutor for Executor { + fn list<'a>( + &'a self, + _options: &'a SourceExecutorListOptions, + ) -> BoxStream<'a, Result>> { + try_stream! { + let mut batch = Vec::new(); + let mut seen_ids = HashSet::new(); + + // Handle database pages + if self.source_type == "database" { + for database_id in &self.database_ids { + let pages = self.fetch_database_pages(database_id).await?; + for page in pages { + if seen_ids.insert(page.id.clone()) { + batch.push(PartialSourceRowMetadata { + key: KeyValue::Str(page.id.into()), + ordinal: Some(self.parse_datetime(&page.last_edited_time)?), + }); + } + } + } + } + + // Handle individual pages + if self.source_type == "page" { + for page_id in &self.page_ids { + let page = self.fetch_page(page_id).await?; + if seen_ids.insert(page.id.clone()) { + batch.push(PartialSourceRowMetadata { + key: KeyValue::Str(page.id.into()), + ordinal: Some(self.parse_datetime(&page.last_edited_time)?), + }); + } + } + } + + if !batch.is_empty() { + yield batch; + } + } + .boxed() + } + + async fn get_value( + &self, + key: &KeyValue, + options: &SourceExecutorGetOptions, + ) -> Result { + let page_id = key.str_value()?; + + // Get or create a mutex for this specific page to prevent concurrent processing + let page_mutex = { + let mut cache = self.processing_cache.lock().unwrap(); + cache + .entry(page_id.to_string()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone() + }; + + // Lock the mutex for this page to ensure only one thread processes it at a time + let _lock = page_mutex.lock().await; + + let page = match self.fetch_page(page_id.as_ref()).await { + Ok(page) => page, + Err(_) => { + return Ok(PartialSourceRowData { + value: Some(SourceValue::NonExistence), + ordinal: Some(Ordinal::unavailable()), + }); + } + }; + + let ordinal = if options.include_ordinal { + Some(self.parse_datetime(&page.last_edited_time)?) + } else { + None + }; + + let value = if options.include_value { + let title = self.extract_title_from_properties(&page.properties); + let content = self.fetch_page_content(page_id.as_ref()).await?; + + let fields = vec![ + page.id.into(), + title.into(), + content.into(), + page.url.unwrap_or_default().into(), + ]; + + Some(SourceValue::Existence(FieldValues { fields })) + } else { + None + }; + + Ok(PartialSourceRowData { value, ordinal }) + } + + async fn change_stream( + &self, + ) -> Result>>> { + Ok(None) + } +} + +pub struct Factory; + +#[async_trait] +impl SourceFactoryBase for Factory { + type Spec = Spec; + + fn name(&self) -> &str { + "Notion" + } + + async fn get_output_schema( + &self, + _spec: &Spec, + _context: &FlowInstanceContext, + ) -> Result { + let mut struct_schema = StructSchema::default(); + let mut schema_builder = StructSchemaBuilder::new(&mut struct_schema); + + schema_builder.add_field(FieldSchema::new( + "id", + make_output_type(BasicValueType::Str), + )); + + let title_field = schema_builder.add_field(FieldSchema::new( + "title", + make_output_type(BasicValueType::Str), + )); + + schema_builder.add_field(FieldSchema::new( + "content", + make_output_type(BasicValueType::Str).with_attr( + field_attrs::CONTENT_FILENAME, + serde_json::to_value(title_field.to_field_ref())?, + ), + )); + + schema_builder.add_field(FieldSchema::new( + "url", + make_output_type(BasicValueType::Str), + )); + + Ok(make_output_type(TableSchema::new( + TableKind::KTable, + struct_schema, + ))) + } + + async fn build_executor( + self: Arc, + spec: Spec, + _context: Arc, + ) -> Result> { + Ok(Box::new(Executor::new(spec))) + } +}