Skip to content

add Azure Blob Storage source with full authentication support #736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
432 changes: 420 additions & 12 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ json5 = "0.4.1"
aws-config = "1.6.2"
aws-sdk-s3 = "1.85.0"
aws-sdk-sqs = "1.67.0"
azure_core = "0.21.0"
azure_storage = "0.21.0"
azure_storage_blobs = "0.21.0"
time = { version = "0.3", features = ["macros", "serde"] }
numpy = "0.25.0"
infer = "0.19.0"
serde_with = { version = "3.13.0", features = ["base64"] }
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Ultra performant data transformation framework for AI, with core engine written

</br>

CocoIndex makes it super easy to transform data with AI workloads, and keep source data and target in sync effortlessly.
CocoIndex makes it super easy to transform data with AI workloads, and keep source data and target in sync effortlessly.

</br>

Expand All @@ -39,7 +39,7 @@ CocoIndex makes it super easy to transform data with AI workloads, and keep sour

</br>

Either creating embedding, building knowledge graphs, or any data transformations - beyond traditional SQL.
Either creating embedding, building knowledge graphs, or any data transformations - beyond traditional SQL.

## Exceptional velocity
Just declare transformation in dataflow with ~100 lines of python
Expand All @@ -65,7 +65,7 @@ CocoIndex follows the idea of [Dataflow](https://en.wikipedia.org/wiki/Dataflow_
**Particularly**, developers don't explicitly mutate data by creating, updating and deleting. They just need to define transformation/formula for a set of source data.

## Build like LEGO
Native builtins for different source, targets and transformations. Standardize interface, make it 1-line code switch between different components.
Native builtins for different source, targets and transformations. Standardize interface, make it 1-line code switch between different components.

<p align="center">
<img src="https://cocoindex.io/images/components.svg" alt="CocoIndex Features">
Expand Down
22 changes: 22 additions & 0 deletions examples/azure_blob_embedding/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Database Configuration
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex

# Azure Blob Storage Configuration (Public test container - ready to use!)
AZURE_STORAGE_ACCOUNT_NAME=testnamecocoindex1
AZURE_BLOB_CONTAINER_NAME=testpublic1
AZURE_BLOB_PREFIX=

# Authentication Options (choose ONE - in priority order):

# Option 1: Connection String (HIGHEST PRIORITY - recommended for development)
# NOTE: Use ACCOUNT KEY connection string, NOT SAS connection string!
# AZURE_BLOB_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=testnamecocoindex1;AccountKey=key1-goes-here;EndpointSuffix=core.windows.net

# Option 2: SAS Token (SECOND PRIORITY - recommended for production)
# AZURE_BLOB_SAS_TOKEN=sp=r&st=2024-01-01T00:00:00Z&se=2025-12-31T23:59:59Z&spr=https&sv=2022-11-02&sr=c&sig=...

# Option 3: Account Key (THIRD PRIORITY)
# AZURE_BLOB_ACCOUNT_KEY=key1-goes-here

# Option 4: Anonymous access (FALLBACK - for public containers only)
# Leave all auth options commented out - testpublic1 container supports this!
153 changes: 153 additions & 0 deletions examples/azure_blob_embedding/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
This example builds an embedding index based on files stored in an Azure Blob Storage container.
It continuously updates the index as files are added / updated / deleted in the source container:
it keeps the index in sync with the Azure Blob Storage container effortlessly.

## Quick Start (Public Test Container)

🚀 **Try it immediately!** We provide a public test container with sample documents:
- **Account:** `testnamecocoindex1`
- **Container:** `testpublic1` (public access)
- **No authentication required!**

Just copy `.env.example` to `.env` and run - it works out of the box with anonymous access.

## Prerequisite

Before running the example, you need to:

1. [Install Postgres](https://cocoindex.io/docs/getting_started/installation#-install-postgres) if you don't have one.

2. Prepare for Azure Blob Storage.
You'll need an Azure Storage account and container. Supported authentication methods:
- **Connection String** (recommended for development)
- **SAS Token** (recommended for production)
- **Account Key** (full access)
- **Anonymous access** (for public containers only)

3. Create a `.env` file with your Azure Blob Storage configuration.
Start from copying the `.env.example`, and then edit it to fill in your credentials.

```bash
cp .env.example .env
$EDITOR .env
```

Example `.env` file with connection string:
```
# Database Configuration
DATABASE_URL=postgresql://localhost:5432/cocoindex

# Azure Blob Storage Configuration
AZURE_STORAGE_ACCOUNT_NAME=mystorageaccount
AZURE_BLOB_CONTAINER_NAME=mydocuments
AZURE_BLOB_PREFIX=

# Authentication (choose one)
AZURE_BLOB_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=mystorageaccount;AccountKey=mykey123;EndpointSuffix=core.windows.net
```

## Run

Install dependencies:

```sh
pip install -e .
```

Run:

```sh
python main.py
```

During running, it will keep observing changes in the Azure Blob Storage container and update the index automatically.
At the same time, it accepts queries from the terminal, and performs search on top of the up-to-date index.


## CocoInsight
CocoInsight is in Early Access now (Free) 😊 You found us! A quick 3 minute video tutorial about CocoInsight: [Watch on YouTube](https://youtu.be/ZnmyoHslBSc?si=pPLXWALztkA710r9).

Run CocoInsight to understand your RAG data pipeline:

```sh
cocoindex server -ci main.py
```

You can also add a `-L` flag to make the server keep updating the index to reflect source changes at the same time:

```sh
cocoindex server -ci -L main.py
```

Then open the CocoInsight UI at [https://cocoindex.io/cocoinsight](https://cocoindex.io/cocoinsight).

## Authentication Methods & Troubleshooting

### Connection String (Recommended for Development)
```bash
AZURE_BLOB_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=testnamecocoindex1;AccountKey=your-key;EndpointSuffix=core.windows.net"
```
- **Pros:** Easiest to set up, contains all necessary information
- **Cons:** Contains account key (full access)
- **⚠️ Important:** Use **Account Key** connection string, NOT SAS connection string!

### SAS Token (Recommended for Production)
```bash
AZURE_BLOB_SAS_TOKEN="sp=r&st=2024-01-01T00:00:00Z&se=2025-12-31T23:59:59Z&spr=https&sv=2022-11-02&sr=c&sig=..."
```
- **Pros:** Fine-grained permissions, time-limited
- **Cons:** More complex to generate and manage

**SAS Token Requirements:**
- `sp=r` - Read permission (required)
- `sp=rl` - Read + List permissions (recommended)
- `sr=c` - Container scope (to access all blobs)
- Valid time range (`st` and `se` in UTC)

### Account Key
```bash
AZURE_BLOB_ACCOUNT_KEY="your-account-key-here"
```
- **Pros:** Simple to use
- **Cons:** Full account access, security risk

### Anonymous Access
Leave all authentication options empty - only works with public containers.

## Common Issues

### 401 Authentication Error
```
Error: server returned error status which will not be retried: 401
Error Code: NoAuthenticationInformation
```
**Solutions:**
1. **Check authentication priority:** Connection String > SAS Token > Account Key > Anonymous
2. **Verify SAS token permissions:** Must include `r` (read) and `l` (list) permissions
3. **Check SAS token expiry:** Ensure `se` (expiry time) is in the future
4. **Verify container scope:** Use `sr=c` for container-level access
### Connection String Issues
**⚠️ CRITICAL: Use Account Key Connection String, NOT SAS Connection String!**
**✅ Correct (Account Key Connection String):**
```
DefaultEndpointsProtocol=https;AccountName=testnamecocoindex1;AccountKey=your-key;EndpointSuffix=core.windows.net
```
**❌ Wrong (SAS Connection String - will not work):**
```
BlobEndpoint=https://testnamecocoindex1.blob.core.windows.net/;SharedAccessSignature=sp=r&st=...
```
**Other tips:**
- Don't include quotes in the actual connection string value
- Account name in connection string should match `AZURE_STORAGE_ACCOUNT_NAME`
- Connection string must contain `AccountKey=` parameter
### Container Access Issues
- Verify container exists and account has access
- Check `AZURE_BLOB_CONTAINER_NAME` spelling
- For anonymous access, container must be public
131 changes: 131 additions & 0 deletions examples/azure_blob_embedding/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from dotenv import load_dotenv
from psycopg_pool import ConnectionPool
import cocoindex
import os
from typing import Any


@cocoindex.transform_flow()
def text_to_embedding(
text: cocoindex.DataSlice[str],
) -> cocoindex.DataSlice[list[float]]:
"""
Embed the text using a SentenceTransformer model.
This is a shared logic between indexing and querying, so extract it as a function.
"""
return text.transform(
cocoindex.functions.SentenceTransformerEmbed(
model="sentence-transformers/all-MiniLM-L6-v2"
)
)


@cocoindex.flow_def(name="AzureBlobTextEmbedding")
def azure_blob_text_embedding_flow(
flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
) -> None:
"""
Define an example flow that embeds text from Azure Blob Storage into a vector database.
"""
account_name = os.environ["AZURE_STORAGE_ACCOUNT_NAME"]
container_name = os.environ["AZURE_BLOB_CONTAINER_NAME"]
prefix = os.environ.get("AZURE_BLOB_PREFIX", None)

# Authentication options (in priority order)
connection_string = os.environ.get("AZURE_BLOB_CONNECTION_STRING", None)
account_key = os.environ.get("AZURE_BLOB_ACCOUNT_KEY", None)
sas_token = os.environ.get("AZURE_BLOB_SAS_TOKEN", None)

data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.AzureBlob(
account_name=account_name,
container_name=container_name,
prefix=prefix,
included_patterns=["*.md", "*.mdx", "*.txt", "*.docx"],
binary=False,
connection_string=connection_string,
account_key=account_key,
sas_token=sas_token,
)
)

doc_embeddings = data_scope.add_collector()

with data_scope["documents"].row() as doc:
doc["chunks"] = doc["content"].transform(
cocoindex.functions.SplitRecursively(),
language="markdown",
chunk_size=2000,
chunk_overlap=500,
)

with doc["chunks"].row() as chunk:
chunk["embedding"] = text_to_embedding(chunk["text"])
doc_embeddings.collect(
filename=doc["filename"],
location=chunk["location"],
text=chunk["text"],
embedding=chunk["embedding"],
)

doc_embeddings.export(
"doc_embeddings",
cocoindex.targets.Postgres(),
primary_key_fields=["filename", "location"],
vector_indexes=[
cocoindex.VectorIndexDef(
field_name="embedding",
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY,
)
],
)


def search(pool: ConnectionPool, query: str, top_k: int = 5) -> list[dict[str, Any]]:
# Get the table name, for the export target in the azure_blob_text_embedding_flow above.
table_name = cocoindex.utils.get_target_default_name(
azure_blob_text_embedding_flow, "doc_embeddings"
)
# Evaluate the transform flow defined above with the input query, to get the embedding.
query_vector = text_to_embedding.eval(query)
# Run the query and get the results.
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(
f"""
SELECT filename, text, embedding <=> %s::vector AS distance
FROM {table_name} ORDER BY distance LIMIT %s
""",
(query_vector, top_k),
)
return [
{"filename": row[0], "text": row[1], "score": 1.0 - row[2]}
for row in cur.fetchall()
]


def _main() -> None:
# Initialize the database connection pool.
pool = ConnectionPool(os.getenv("COCOINDEX_DATABASE_URL"))

azure_blob_text_embedding_flow.setup()
with cocoindex.FlowLiveUpdater(azure_blob_text_embedding_flow):
# Run queries in a loop to demonstrate the query capabilities.
while True:
query = input("Enter search query (or Enter to quit): ")
if query == "":
break
# Run the query function with the database connection pool and the query.
results = search(pool, query)
print("\nSearch results:")
for result in results:
print(f"[{result['score']:.3f}] {result['filename']}")
print(f" {result['text']}")
print("---")
print()


if __name__ == "__main__":
load_dotenv()
cocoindex.init()
_main()
9 changes: 9 additions & 0 deletions examples/azure_blob_embedding/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[project]
name = "azure-blob-text-embedding"
version = "0.1.0"
description = "Simple example for cocoindex: build embedding index based on Azure Blob Storage files."
requires-python = ">=3.11"
dependencies = ["cocoindex[embeddings]>=0.1.63", "python-dotenv>=1.0.1"]

[tool.setuptools]
packages = []
23 changes: 23 additions & 0 deletions python/cocoindex/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,26 @@ class AmazonS3(op.SourceSpec):
included_patterns: list[str] | None = None
excluded_patterns: list[str] | None = None
sqs_queue_url: str | None = None


class AzureBlob(op.SourceSpec):
"""Import data from an Azure Blob Storage container. Supports optional prefix and file filtering by glob patterns.

Authentication options (in priority order):
1. connection_string - Full connection string with credentials
2. sas_token - Shared Access Signature token
3. account_key - Storage account access key
4. None - Anonymous access (for public containers)
"""

_op_category = op.OpCategory.SOURCE

account_name: str
container_name: str
prefix: str | None = None
binary: bool = False
included_patterns: list[str] | None = None
excluded_patterns: list[str] | None = None
account_key: str | None = None
sas_token: str | None = None
connection_string: str | None = None
1 change: 1 addition & 0 deletions src/ops/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ fn register_executor_factories(registry: &mut ExecutorFactoryRegistry) -> Result
sources::local_file::Factory.register(registry)?;
sources::google_drive::Factory.register(registry)?;
sources::amazon_s3::Factory.register(registry)?;
sources::azure_blob::Factory.register(registry)?;

functions::parse_json::Factory.register(registry)?;
functions::split_recursively::register(registry)?;
Expand Down
Loading
Loading