|
| 1 | +import os |
| 2 | +import json |
| 3 | +import random |
| 4 | +import string |
| 5 | +from google import genai |
| 6 | +from google.genai.types import EmbedContentConfig |
| 7 | +from langchain.text_splitter import RecursiveCharacterTextSplitter |
| 8 | +from google.cloud import storage |
| 9 | +from google.cloud import aiplatform |
| 10 | +import functions_framework |
| 11 | + |
| 12 | +# Cloud Storage Client |
| 13 | +storage_client = storage.Client() |
| 14 | + |
| 15 | +# Vertex AI Embeddings API (genai SDK) |
| 16 | +os.environ["GOOGLE_CLOUD_PROJECT"] = os.getenv("PROJECT_ID") |
| 17 | +os.environ["GOOGLE_CLOUD_LOCATION"] = os.getenv("GCP_REGION") |
| 18 | +os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "true" |
| 19 | +genai_client = genai.Client() # Embeddings API |
| 20 | + |
| 21 | +# Vertex AI Vector Search (fka Matching Engine) |
| 22 | +project_id = os.getenv("PROJECT_ID") |
| 23 | +location = os.getenv("GCP_REGION") |
| 24 | +index_id = os.getenv("VECTOR_SEARCH_INDEX_ID") |
| 25 | +index_endpoint_name = os.getenv("VECTOR_SEARCH_INDEX_ENDPOINT_NAME") |
| 26 | +aiplatform_client = aiplatform.init(project=project_id, location=location) |
| 27 | +index = aiplatform.MatchingEngineIndex(index_id) |
| 28 | +index_endpoint = aiplatform.MatchingEngineIndexEndpoint( |
| 29 | + index_endpoint_name=index_endpoint_name |
| 30 | +) |
| 31 | + |
| 32 | + |
| 33 | +# ------- HELPER FUNCTIONS -------------------------- |
| 34 | +def randomStringDigits(stringLength=5): |
| 35 | + """Generate a random string of letters and digits""" |
| 36 | + lettersAndDigits = string.digits |
| 37 | + return "".join(random.choice(lettersAndDigits) for i in range(stringLength)) |
| 38 | + |
| 39 | + |
| 40 | +def gcs_download_document(bucket_name, blob_name): |
| 41 | + """ |
| 42 | + Downloads raw text document from Cloud Storage bucket |
| 43 | + """ |
| 44 | + print("🪣 Downloading doc from GCS:" + bucket_name + "/" + blob_name) |
| 45 | + bucket = storage_client.bucket(bucket_name) |
| 46 | + blob = bucket.blob(blob_name) |
| 47 | + dl = blob.download_as_string() |
| 48 | + # clean up the text |
| 49 | + return dl.decode("utf-8").strip().replace("\n", " ") |
| 50 | + |
| 51 | + |
| 52 | +def chunk_text(text, chunk_size=500): |
| 53 | + """ |
| 54 | + Chunks raw document text into roughly 500-character chunks, while preserving individual words. https://python.langchain.com/api_reference/text_splitters/character/langchain_text_splitters.character.RecursiveCharacterTextSplitter.html#recursivecharactertextsplitter |
| 55 | +
|
| 56 | + Why 500? Another Vertex AI DB product, Vertex AI Search, uses a default chunk size of 500 tokens https://cloud.google.com/vertex-ai/generative-ai/docs/embeddings/get-text-embeddings#googlegenaisdk_embeddings_docretrieval_with_txt-python_genai_sdk |
| 57 | + """ |
| 58 | + splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size) |
| 59 | + return splitter.split_text(text) |
| 60 | + |
| 61 | + |
| 62 | +def get_embeddings(text_chunks): |
| 63 | + """ |
| 64 | + Call Vertex AI Embeddings API (text-embedding-005 model) to generate vector representations of all document chunks |
| 65 | + """ |
| 66 | + to_write = [] |
| 67 | + for i, chunk in enumerate(text_chunks): |
| 68 | + print("⏲️ Generating embeddings for chunk " + str(i)) |
| 69 | + response = genai_client.models.embed_content( |
| 70 | + model="text-embedding-005", |
| 71 | + contents=[chunk], |
| 72 | + config=EmbedContentConfig( |
| 73 | + task_type="RETRIEVAL_DOCUMENT", |
| 74 | + output_dimensionality=768, |
| 75 | + ), |
| 76 | + ) |
| 77 | + emb = response.embeddings[0].values |
| 78 | + body = { |
| 79 | + "id": randomStringDigits(stringLength=5), |
| 80 | + "text": chunk, |
| 81 | + "embedding": emb, |
| 82 | + } |
| 83 | + to_write.append(body) |
| 84 | + return to_write |
| 85 | + |
| 86 | + |
| 87 | +def write_embeddings_to_jsonl(embeddings, outfile): |
| 88 | + """ |
| 89 | + Write the embeddings to a JSONL file. |
| 90 | + JSONL ("JSON List") is what Vertex AI Vector Search needs for upsert. |
| 91 | + """ |
| 92 | + print("📝 Writing embeddings to JSONL") |
| 93 | + with open(outfile, "w") as f: |
| 94 | + for embedding in embeddings: |
| 95 | + f.write(json.dumps(embedding) + "\n") |
| 96 | + |
| 97 | + |
| 98 | +def store_embeddings_vavs(infile): |
| 99 | + """ |
| 100 | + Upsert (stream) embeddings to Vertex AI Vector Search index. |
| 101 | + https://cloud.google.com/vertex-ai/docs/reference/rest/v1/projects.locations.indexes/upsertDatapoints#IndexDatapoint |
| 102 | + """ |
| 103 | + with open(infile) as f: |
| 104 | + lines = f.readlines() |
| 105 | + datapoints = [] |
| 106 | + for line in lines: |
| 107 | + item = json.loads(line) |
| 108 | + d = { |
| 109 | + "datapoint_id": str(item["id"]), |
| 110 | + # Correctly format the restricts field as a list of dictionaries |
| 111 | + "restricts": [{"namespace": "text", "allow_list": [item["text"]]}], |
| 112 | + "feature_vector": item["embedding"], |
| 113 | + } |
| 114 | + datapoints.append(d) |
| 115 | + |
| 116 | + print( |
| 117 | + "⬆️ Upserting " |
| 118 | + + str(len(datapoints)) |
| 119 | + + " embeddings to Vertex AI Vector Search" |
| 120 | + ) |
| 121 | + index.upsert_datapoints(datapoints=datapoints) |
| 122 | + print("✅ Done upserting.") |
| 123 | + |
| 124 | + |
| 125 | +def extract_id_and_text(neighbor): |
| 126 | + """ |
| 127 | + Extract ID and text from a Vertex AI Vector Search "MatchNeighbor" object |
| 128 | + """ |
| 129 | + id_value = neighbor.id |
| 130 | + text_value = None |
| 131 | + if hasattr(neighbor, "restricts") and neighbor.restricts: |
| 132 | + for restrict in neighbor.restricts: |
| 133 | + if hasattr(restrict, "name") and restrict.name == "text": |
| 134 | + if hasattr(restrict, "allow_tokens") and restrict.allow_tokens: |
| 135 | + text_value = restrict.allow_tokens[0] |
| 136 | + break |
| 137 | + |
| 138 | + return {"id": id_value, "text": text_value} |
| 139 | + |
| 140 | + |
| 141 | +def test_nearest_neighbors_query(q): |
| 142 | + """ |
| 143 | + Test a query against the deployed Vertex AI Vector Search index. |
| 144 | + """ |
| 145 | + response = genai_client.models.embed_content( |
| 146 | + model="text-embedding-005", |
| 147 | + contents=[q], |
| 148 | + config=EmbedContentConfig( |
| 149 | + task_type="RETRIEVAL_QUERY", |
| 150 | + output_dimensionality=768, |
| 151 | + ), |
| 152 | + ) |
| 153 | + query_embedding = response.embeddings[0].values |
| 154 | + print("Query is: " + str(q)) |
| 155 | + neighbors = index_endpoint.find_neighbors( |
| 156 | + deployed_index_id=os.getenv("VECTOR_SEARCH_DEPLOYED_INDEX_ID"), |
| 157 | + queries=[query_embedding], |
| 158 | + num_neighbors=3, |
| 159 | + return_full_datapoint=True, # Make sure this is True |
| 160 | + ) |
| 161 | + |
| 162 | + print("Got # neighbors: " + str(len(neighbors[0]))) |
| 163 | + for n in neighbors[0]: |
| 164 | + result = extract_id_and_text(n) |
| 165 | + print(f"ID: {result['id']}") |
| 166 | + print(f"Text: {result['text']}") |
| 167 | + |
| 168 | + |
| 169 | +def ingest_text_document(filename): |
| 170 | + """ |
| 171 | + Main ingestion function: |
| 172 | + - Downloads raw text from Cloud Storage |
| 173 | + - Chunks text |
| 174 | + - Generates embeddings |
| 175 | + - Writes embeddings to JSONL |
| 176 | + - Upserts embeddings as JSONL to Vertex AI Vector Search |
| 177 | + """ |
| 178 | + gcs_bucket = os.getenv("GCS_BUCKET") |
| 179 | + filename = os.getenv("INPUT_DOC_FILENAME") |
| 180 | + raw_text = gcs_download_document(gcs_bucket, filename) |
| 181 | + print("\n📄 Raw text is char length: " + str(len(raw_text))) |
| 182 | + text_chunks = chunk_text(raw_text) |
| 183 | + print("\n✂️ Created " + str(len(text_chunks)) + " text chunks from document.") |
| 184 | + embeddings = get_embeddings(text_chunks) |
| 185 | + print("🧠 Created 1 embedding per chunk.") |
| 186 | + write_embeddings_to_jsonl(embeddings, "embeddings.json") |
| 187 | + store_embeddings_vavs("embeddings.json") |
| 188 | + test_nearest_neighbors_query(filename) |
| 189 | + |
| 190 | + |
| 191 | +""" |
| 192 | +Process the CloudEvent data (GCS file upload) to trigger Vertex AI Vector Search ingestion for that file. |
| 193 | +""" |
| 194 | + |
| 195 | + |
| 196 | +@functions_framework.cloud_event |
| 197 | +def process_data(cloud_event): |
| 198 | + data = cloud_event.data |
| 199 | + print(f"CloudEvent data: \n {data}") |
| 200 | + """ |
| 201 | + {'message': {'attributes': {'bucketId': 'ingest-67ab', 'eventTime': '2025-02-27T15:44:39.422831Z', 'eventType': 'OBJECT_FINALIZE', 'notificationConfig': 'projects/_/buckets/ingest-67ab/notificationConfigs/1', 'objectGeneration': '1740671079418498', 'objectId': 'willow_processor.txt', 'payloadFormat': 'JSON_API_V1'}, 'data': '...', 'messageId': '14113274556428337', 'message_id': '14113274556428337', 'publishTime': '2025-02-27T15:44:39.603Z', 'publish_time': '2025-02-27T15:44:39.603Z'}, 'subscription': 'projects/next25rag/subscriptions/eventarc-us-central1-ingestion-67ab-481379-sub-361'} |
| 202 | + """ |
| 203 | + os.environ["GCS_BUCKET"] = data["message"]["attributes"]["bucketId"] |
| 204 | + os.environ["INPUT_DOC_FILENAME"] = data["message"]["attributes"]["objectId"] |
| 205 | + ingest_text_document(os.getenv("INPUT_DOC_FILENAME")) |
0 commit comments