From 9eb043c123de755bda84d303cdd20aaf80e86ebf Mon Sep 17 00:00:00 2001 From: Siyu Chen Date: Mon, 22 Jun 2026 09:49:54 +0000 Subject: [PATCH] feat(volc_mysql): add VolcMySQL backend with HNSW vector index support Add a vector database backend for VolcMySQL (Volcano Engine MySQL with native VECTOR type and HNSW vector index), connecting via mysql-connector-python over the MySQL wire protocol. Components: - volc_mysql.py: VectorDB implementation with VECTOR(dim) table, bulk LOAD DATA LOCAL INFILE loading, and CREATE VECTOR INDEX via SECONDARY_ENGINE_ATTRIBUTE (algorithm/distance/m/ef_construction and optional quant_algorithm/quant_type for SQ/PQ) - config.py: DBConfig (host/port/user/password) and VolcMySQLHNSWConfig (m/ef_search/ef_construction + quantization params) - cli.py: Click command `VolcMySQLHNSW` Registration: - Add VolcMySQL to the DB enum in backend/clients/__init__.py with lazy imports for init_cls, config_cls, and case_config_cls - Register VolcMySQLHNSW CLI command in cli/vectordbbench.py - Add volc_mysql optional dependency in pyproject.toml Binary VECTOR path with auto-probe fallback: - Vectors are sent as raw little-endian float32 bytes (UNHEX on load, the `_binary` introducer on query), avoiding to_vector() text parsing and Python str() formatting; `_binary` literals stay constant-foldable so the HNSW index scan is preserved - init() probes once per connection (session-local TEMPORARY TABLE) whether the server accepts the binary path and transparently falls back to the to_vector() text path -- for both insert and query -- when it does not. VDB_BINARY_VEC overrides the probe (1=force binary, 0=force text) - Recall is identical between the two paths (0.9786 vs 0.9773 on 1536D50K) Tests: tests/test_volc_mysql_encoder.py covers binary and text TSV encoding. --- README.md | 43 +- pyproject.toml | 1 + tests/test_volc_mysql_encoder.py | 74 ++++ vectordb_bench/backend/clients/__init__.py | 16 + .../backend/clients/volc_mysql/cli.py | 134 ++++++ .../backend/clients/volc_mysql/config.py | 82 ++++ .../backend/clients/volc_mysql/volc_mysql.py | 381 ++++++++++++++++++ vectordb_bench/cli/vectordbbench.py | 2 + 8 files changed, 732 insertions(+), 1 deletion(-) create mode 100644 tests/test_volc_mysql_encoder.py create mode 100755 vectordb_bench/backend/clients/volc_mysql/cli.py create mode 100755 vectordb_bench/backend/clients/volc_mysql/config.py create mode 100755 vectordb_bench/backend/clients/volc_mysql/volc_mysql.py diff --git a/README.md b/README.md index 3cdceddc0..fb247f139 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,7 @@ All the database client supported | zvec | `pip install vectordb-bench[zvec]` | | endee | `pip install vectordb-bench[endee]` | | lindorm | `pip install vectordb-bench[lindorm]` | +| volc_mysql | `pip install vectordb-bench[volc_mysql]` | ### Run @@ -87,6 +88,7 @@ Commands: pgvectorhnsw pgvectorivfflat vectorchordrq + volcmysqlhnsw test weaviate ``` @@ -591,6 +593,45 @@ To list the options for PolarDB, execute `vectordbbench polardbhnswflat --help`. Create index after load or inline at table creation ``` +### Run VolcMySQL from command line + +VolcMySQL is a MySQL-compatible service with a native `VECTOR` type and an HNSW vector index (created via `SECONDARY_ENGINE_ATTRIBUTE`). Optional quantization is configurable through `--quant-algorithm` (`NONE`, `SQ`, `PQ`) and `--quant-type` (`16_bit`, `8_bit`, `4_bit`, `binary`). + +```shell +vectordbbench volcmysqlhnsw \ + --case-type Performance1536D50K \ + --username \ + --password '' \ + --host \ + --port 3306 \ + --m 16 \ + --ef-construction 128 \ + --ef-search 100 \ + --quant-algorithm SQ \ + --quant-type 16_bit \ + --num-concurrency '10,20,40,60,80' \ + --concurrency-duration 30 \ + --task-label \ + --db-label +``` + +To list the options for VolcMySQL, execute `vectordbbench volcmysqlhnsw --help`. The following are some VolcMySQL-specific command-line options. + +```text + --username TEXT Username [required] + --password TEXT Password [required] + --host TEXT Db host [default: 127.0.0.1] + --port INTEGER DB Port [default: 3306] + --m INTEGER M parameter in HNSW vector indexing + --ef-search INTEGER Session variable loose_hnsw_ef_search + --ef-construction INTEGER HNSW ef_construction + --quant-algorithm [NONE|SQ|PQ] Quantization algorithm + --quant-type [16_bit|8_bit|4_bit|binary] + Quantization type +``` + +> Note: vectors are loaded and queried over the raw-binary `VECTOR` path by default; the client auto-probes server support and falls back to `to_vector()` text when unavailable. Set `VDB_BINARY_VEC=0` to force the text path or `1` to force binary. + #### Using a configuration file. The vectordbbench command can optionally read some or all the options from a yaml formatted configuration file. @@ -796,7 +837,7 @@ Now we can only run one task at the same time. ### Code Structure ![image](https://github.com/zilliztech/VectorDBBench/assets/105927039/8c06512e-5419-4381-b084-9c93aed59639) ### Client -Our client module is designed with flexibility and extensibility in mind, aiming to integrate APIs from different systems seamlessly. As of now, it supports Milvus, Zilliz Cloud, Elastic Search, Pinecone, Qdrant Cloud, Weaviate Cloud, PgVector, VectorChord, Redis, Chroma, CockroachDB, etc. Stay tuned for more options, as we are consistently working on extending our reach to other systems. +Our client module is designed with flexibility and extensibility in mind, aiming to integrate APIs from different systems seamlessly. As of now, it supports Milvus, Zilliz Cloud, Elastic Search, Pinecone, Qdrant Cloud, Weaviate Cloud, PgVector, VectorChord, Redis, Chroma, CockroachDB, VolcMySQL, etc. Stay tuned for more options, as we are consistently working on extending our reach to other systems. ### Benchmark Cases We've developed lots of comprehensive benchmark cases to test vector databases' various capabilities, each designed to give you a different piece of the puzzle. These cases are categorized into four main types: #### Capacity Case diff --git a/pyproject.toml b/pyproject.toml index 3bbba8ac0..325fafb78 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -82,6 +82,7 @@ zvec = [ "zvec" ] endee = [ "endee==0.1.10" ] lindorm = [ "opensearch-py" ] seekdb = [ "mysql-connector-python" ] +volc_mysql = [ "mysql-connector-python" ] pinot = [ "requests" ] [project.urls] diff --git a/tests/test_volc_mysql_encoder.py b/tests/test_volc_mysql_encoder.py new file mode 100644 index 000000000..fe656bf29 --- /dev/null +++ b/tests/test_volc_mysql_encoder.py @@ -0,0 +1,74 @@ +import struct + +from vectordb_bench.backend.clients.volc_mysql.volc_mysql import _encode_batch_to_tsv + + +def test_encode_batch_to_tsv_sorts_by_id_and_hex_encodes(tmp_path): + tsv = tmp_path / "batch.tsv" + _encode_batch_to_tsv( + metadata=[42, 7, 99], + embeddings=[[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], + dim=2, + tsv_path=str(tsv), + ) + + lines = tsv.read_text().splitlines() + assert len(lines) == 3 + + # Sorted by id ascending: 7, 42, 99 + assert lines[0].startswith("7\t") + assert lines[1].startswith("42\t") + assert lines[2].startswith("99\t") + + # The (3.0, 4.0) embedding was paired with id=7 + _, hex_str = lines[0].split("\t") + floats = struct.unpack("<2f", bytes.fromhex(hex_str)) + assert floats == (3.0, 4.0) + + +def test_encode_batch_to_tsv_writes_dim_floats_per_row(tmp_path): + tsv = tmp_path / "batch.tsv" + _encode_batch_to_tsv( + metadata=[1], + embeddings=[[0.5] * 1536], + dim=1536, + tsv_path=str(tsv), + ) + + line = tsv.read_text().rstrip("\n") + id_str, hex_str = line.split("\t") + assert id_str == "1" + # 1536 floats * 4 bytes * 2 hex chars = 12288 chars + assert len(hex_str) == 12288 + assert bytes.fromhex(hex_str) == struct.pack("<1536f", *([0.5] * 1536)) + + +def test_encode_batch_to_tsv_empty_batch(tmp_path): + tsv = tmp_path / "batch.tsv" + _encode_batch_to_tsv(metadata=[], embeddings=[], dim=4, tsv_path=str(tsv)) + assert tsv.read_text() == "" + + +def test_encode_batch_to_tsv_text_mode_sorts_and_formats(tmp_path): + # binary=False (to_vector fallback): rows sorted by id, vector written as a + # delimiter-safe "[f1,f2,...]" literal with no tab/newline inside the field. + tsv = tmp_path / "batch.tsv" + _encode_batch_to_tsv( + metadata=[42, 7, 99], + embeddings=[[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], + dim=2, + tsv_path=str(tsv), + binary=False, + ) + + lines = tsv.read_text().splitlines() + assert len(lines) == 3 + # Sorted by id ascending: 7, 42, 99; id=7 was paired with (3.0, 4.0) + assert lines[0] == "7\t[3.0,4.0]" + assert lines[1] == "42\t[1.0,2.0]" + assert lines[2] == "99\t[5.0,6.0]" + # field is delimiter-safe: no tab/newline inside the bracketed literal + for line in lines: + _id, vec = line.split("\t") + assert vec.startswith("[") and vec.endswith("]") + assert "\t" not in vec diff --git a/vectordb_bench/backend/clients/__init__.py b/vectordb_bench/backend/clients/__init__.py index 4be8d0424..d4ca44dcc 100644 --- a/vectordb_bench/backend/clients/__init__.py +++ b/vectordb_bench/backend/clients/__init__.py @@ -63,6 +63,7 @@ class DB(Enum): PolarDB = "PolarDB" Pinot = "Pinot" SeekDB = "SeekDB" + VolcMySQL = "VolcMySQL" @property def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901, PLR0915 @@ -269,6 +270,11 @@ def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901, PLR0915 return SeekDB + if self == DB.VolcMySQL: + from .volc_mysql.volc_mysql import VolcMySQL + + return VolcMySQL + msg = f"Unknown DB: {self.name}" raise ValueError(msg) @@ -477,6 +483,11 @@ def config_cls(self) -> type[DBConfig]: # noqa: PLR0911, PLR0912, C901, PLR0915 return SeekDBConfig + if self == DB.VolcMySQL: + from .volc_mysql.config import VolcMySQLConfig + + return VolcMySQLConfig + msg = f"Unknown DB: {self.name}" raise ValueError(msg) @@ -667,6 +678,11 @@ def case_config_cls( # noqa: C901, PLR0911, PLR0912, PLR0915 return _seekdb_case_config.get(index_type) + if self == DB.VolcMySQL: + from .volc_mysql.config import _volcmysql_case_config + + return _volcmysql_case_config.get(index_type) + # DB.Pinecone, DB.Redis return EmptyDBCaseConfig diff --git a/vectordb_bench/backend/clients/volc_mysql/cli.py b/vectordb_bench/backend/clients/volc_mysql/cli.py new file mode 100755 index 000000000..9ce563473 --- /dev/null +++ b/vectordb_bench/backend/clients/volc_mysql/cli.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python3 +from typing import Annotated, Unpack + +import click +from pydantic import SecretStr + +from vectordb_bench.backend.clients import DB + +from ....cli.cli import ( + CommonTypedDict, + cli, + click_parameter_decorators_from_typed_dict, + run, +) + + +class VolcMySQLTypedDict(CommonTypedDict): + user_name: Annotated[ + str, + click.option( + "--username", + type=str, + help="Username", + required=True, + ), + ] + password: Annotated[ + str, + click.option( + "--password", + type=str, + help="Password", + required=True, + ), + ] + + host: Annotated[ + str, + click.option( + "--host", + type=str, + help="Db host", + default="127.0.0.1", + ), + ] + + port: Annotated[ + int, + click.option( + "--port", + type=int, + default=3306, + help="DB Port", + ), + ] + + +class VolcMySQLHNSWTypedDict(VolcMySQLTypedDict): + m: Annotated[ + int | None, + click.option( + "--m", + type=int, + help="M parameter in HNSW vector indexing", + required=False, + ), + ] + + ef_search: Annotated[ + int | None, + click.option( + "--ef-search", + type=int, + help="Session variable loose_hnsw_ef_search", + required=False, + ), + ] + + ef_construction: Annotated[ + int | None, + click.option( + "--ef-construction", + type=int, + help="HNSW ef_construction", + required=False, + ), + ] + + quant_algorithm: Annotated[ + str | None, + click.option( + "--quant-algorithm", + type=click.Choice(["NONE", "SQ", "PQ"]), + help="Quantization algorithm", + required=False, + ), + ] + + quant_type: Annotated[ + str | None, + click.option( + "--quant-type", + type=click.Choice(["16_bit", "8_bit", "4_bit", "binary"]), + help="Quantization type", + required=False, + ), + ] + + +@cli.command() +@click_parameter_decorators_from_typed_dict(VolcMySQLHNSWTypedDict) +def VolcMySQLHNSW( + **parameters: Unpack[VolcMySQLHNSWTypedDict], +): + from .config import VolcMySQLConfig, VolcMySQLHNSWConfig + + run( + db=DB.VolcMySQL, + db_config=VolcMySQLConfig( + db_label=parameters["db_label"], + user_name=parameters["username"], + password=SecretStr(parameters["password"]), + host=parameters["host"], + port=parameters["port"], + ), + db_case_config=VolcMySQLHNSWConfig( + M=parameters["m"], + ef_search=parameters["ef_search"], + ef_construction=parameters["ef_construction"], + quant_algorithm=parameters["quant_algorithm"], + quant_type=parameters["quant_type"], + ), + **parameters, + ) diff --git a/vectordb_bench/backend/clients/volc_mysql/config.py b/vectordb_bench/backend/clients/volc_mysql/config.py new file mode 100755 index 000000000..e17d827bb --- /dev/null +++ b/vectordb_bench/backend/clients/volc_mysql/config.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 +from typing import TypedDict + +from pydantic import BaseModel, SecretStr + +from ..api import DBCaseConfig, DBConfig, IndexType, MetricType + + +class VolcMySQLConfigDict(TypedDict): + """Keys used directly as kwargs in mysql.connector.connect, + names must match mysql-connector-python API.""" + + user: str + password: str + host: str + port: int + connection_timeout: int + read_timeout: int + write_timeout: int + + +class VolcMySQLConfig(DBConfig): + user_name: str = "root" + password: SecretStr + host: str = "127.0.0.1" + port: int = 3306 + + def to_dict(self) -> VolcMySQLConfigDict: + pwd_str = self.password.get_secret_value() + return { + "host": self.host, + "port": self.port, + "user": self.user_name, + "password": pwd_str, + "connection_timeout": 600, # 10 minutes for connection establishment + "read_timeout": 3600, # 1 hour for read operations + "write_timeout": 3600, # 1 hour for write operations + } + + +class VolcMySQLIndexConfig(BaseModel): + """Base index config for VolcMySQL""" + + metric_type: MetricType | None = None + + def parse_metric(self) -> str: + if self.metric_type == MetricType.L2: + return "l2" + if self.metric_type == MetricType.COSINE: + return "cosine" + msg = f"Metric type {self.metric_type} is not supported!" + raise ValueError(msg) + + +class VolcMySQLHNSWConfig(VolcMySQLIndexConfig, DBCaseConfig): + M: int | None + ef_search: int | None + ef_construction: int | None + quant_algorithm: str | None = None + quant_type: str | None = None + index: IndexType = IndexType.HNSW + + def index_param(self) -> dict: + return { + "metric_type": self.parse_metric(), + "index_type": self.index.value, + "M": self.M, + "ef_construction": self.ef_construction, + "quant_algorithm": self.quant_algorithm, + "quant_type": self.quant_type, + } + + def search_param(self) -> dict: + return { + "metric_type": self.parse_metric(), + "ef_search": self.ef_search, + } + + +_volcmysql_case_config = { + IndexType.HNSW: VolcMySQLHNSWConfig, +} diff --git a/vectordb_bench/backend/clients/volc_mysql/volc_mysql.py b/vectordb_bench/backend/clients/volc_mysql/volc_mysql.py new file mode 100755 index 000000000..7f241e046 --- /dev/null +++ b/vectordb_bench/backend/clients/volc_mysql/volc_mysql.py @@ -0,0 +1,381 @@ +#!/usr/bin/env python3 +import json +import logging +import os +import struct +import tempfile +import time +from contextlib import contextmanager +from pathlib import Path + +import mysql.connector as mysql +import numpy as np + +from ..api import VectorDB +from .config import VolcMySQLConfigDict, VolcMySQLIndexConfig + +log = logging.getLogger(__name__) + + +def _encode_batch_to_tsv( + metadata: list[int], + embeddings: list[list[float]], + dim: int, + tsv_path: str, + *, + binary: bool = True, +) -> None: + """Sort (id, vector) pairs by id ascending and write them to ``tsv_path`` + for bulk ``LOAD DATA`` ingestion. + + ``binary=True`` (default) encodes each vector as hex of little-endian + float32 bytes -> ``\\t\\n``, consumed by + ``... (id, @h) SET v = UNHEX(@h)``. + + ``binary=False`` (to_vector fallback) writes the vector as a JSON-style + ``[f1,f2,...]`` literal -> ``\\t[..]\\n``, consumed by + ``... (id, @v) SET v = to_vector(@v)``. The literal contains no tab or + newline, so it stays delimiter-safe in the TSV transport. + """ + order = np.argsort(metadata) + with Path(tsv_path).open("w", buffering=1 << 20) as f: + if binary: + pack_fmt = f"<{dim}f" + for i in order: + hex_str = struct.pack(pack_fmt, *embeddings[i]).hex() + f.write(f"{metadata[i]}\t{hex_str}\n") + else: + for i in order: + vec_str = "[" + ",".join(repr(float(x)) for x in embeddings[i]) + "]" + f.write(f"{metadata[i]}\t{vec_str}\n") + + +def vector_to_string(embedding: list[float]) -> str: + """Convert vector to string representation for to_vector() function""" + return str(embedding) + + +class VolcMySQL(VectorDB): + def __init__( + self, + dim: int, + db_config: VolcMySQLConfigDict, + db_case_config: VolcMySQLIndexConfig, + collection_name: str = "vec_collection", + drop_old: bool = False, + debug_mode: bool = False, + **kwargs, + ): + self.name = "VolcMySQL" + self.db_config = db_config + self.case_config = db_case_config + self.db_name = "vectordbbench" + self.table_name = collection_name + self.dim = dim + self.debug_mode = debug_mode + + # construct basic units - use non-prepared cursor for DDL operations + self.conn, self.cursor, self.admin_cursor = self._create_connection() + + if drop_old: + self._drop_db() + self._create_db_table(dim) + + self.cursor.close() + self.admin_cursor.close() + self.conn.close() + self.cursor = None + self.admin_cursor = None + self.conn = None + + def _create_connection(self): + conn = mysql.connect( + host=self.db_config["host"], + user=self.db_config["user"], + port=self.db_config["port"], + password=self.db_config["password"], + allow_local_infile=True, + ) + cursor = conn.cursor() + admin_cursor = conn.cursor() + + assert conn is not None, "Connection is not initialized" + assert cursor is not None, "Cursor is not initialized" + assert admin_cursor is not None, "Admin cursor is not initialized" + + return conn, cursor, admin_cursor + + def _drop_db(self): + assert self.conn is not None, "Connection is not initialized" + assert self.admin_cursor is not None, "Cursor is not initialized" + log.info(f"{self.name} client drop db : {self.db_name}") + + # flush tables before dropping database to avoid some locking issue + self.admin_cursor.execute("FLUSH TABLES") + self.admin_cursor.execute(f"DROP DATABASE IF EXISTS {self.db_name}") + self.admin_cursor.execute("COMMIT") + self.admin_cursor.execute("FLUSH TABLES") + + def _create_db_table(self, dim: int): + assert self.conn is not None, "Connection is not initialized" + assert self.admin_cursor is not None, "Cursor is not initialized" + + try: + log.info(f"{self.name} client create database : {self.db_name}") + self.admin_cursor.execute(f"CREATE DATABASE {self.db_name}") + + log.info(f"{self.name} client create table : {self.table_name}") + self.admin_cursor.execute(f"USE {self.db_name}") + + self.admin_cursor.execute( + f""" + CREATE TABLE {self.table_name} ( + id INT PRIMARY KEY, + v VECTOR({self.dim}) NOT NULL + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 + """ + ) + self.admin_cursor.execute("COMMIT") + + except Exception as e: + log.warning(f"Failed to create table: {self.table_name} error: {e}") + raise e from None + + def _probe_binary_support(self) -> bool: + """Probe once whether the server accepts the raw little-endian float32 + binary VECTOR path: ``UNHEX()`` insert and the ``_binary`` query + introducer. Returns ``True`` only if both succeed; otherwise ``False`` + so :meth:`init` selects the ``to_vector()`` text path for insert and + search. + + Runs inside a session-local ``TEMPORARY TABLE`` with a throwaway + 4-dim vector, so it never touches the benchmark table and is + independent of the real vector dimension. A probe failure is expected + (not an error) on builds without binary VECTOR support and only flips + the path; it never raises. + """ + probe = [1.0, 2.0, 3.0, 4.0] + blob = struct.pack(f"<{len(probe)}f", *probe) + # Qualify with the (already-existing) benchmark schema: the client never + # issues USE, so the connection has no default database to host the temp table. + tmp = f"`{self.db_name}`._vdbb_binprobe" + cur = self.conn.cursor(buffered=True) + try: + cur.execute(f"CREATE TEMPORARY TABLE {tmp} (id INT PRIMARY KEY, v VECTOR(4))") + cur.execute(f"INSERT INTO {tmp} (id, v) VALUES (1, UNHEX(%s))", (blob.hex(),)) + cur.execute(f"SELECT id FROM {tmp} ORDER BY L2_DISTANCE(v, _binary %s) LIMIT 1", (blob,)) + cur.fetchall() + except mysql.Error as e: + log.warning(f"{self.name}: binary VECTOR path unsupported, falling back to to_vector() text path: {e}") + return False + else: + log.info(f"{self.name}: binary VECTOR path supported; using raw-binary insert + query") + return True + finally: + try: + cur.execute(f"DROP TEMPORARY TABLE IF EXISTS {tmp}") + except mysql.Error: + log.debug("Failed to drop binary-probe temp table", exc_info=True) + cur.close() + + @contextmanager + def init(self): + """create and destory connections to database. + + Examples: + >>> with self.init(): + >>> self.insert_embeddings() + """ + # Use prepared cursor for binary vector operations + self.conn, self.cursor, self.admin_cursor = self._create_connection() + + # Load-phase session tuning. SESSION-scoped; resets when the + # connection closes. No GLOBAL or instance-level changes. + try: + self.admin_cursor.execute("SET SESSION unique_checks = 0") + self.admin_cursor.execute("SET SESSION foreign_key_checks = 0") + except mysql.errors.ProgrammingError as e: + log.warning(f"Could not apply load-phase session tuning: {e}") + + # Per-batch TSV file numbering for LOAD DATA bulk load. + self._batch_counter = 0 + + index_param = self.case_config.index_param() + search_param = self.case_config.search_param() + + + if search_param.get("ef_search") is not None: + try: + self.admin_cursor.execute(f"SET loose_hnsw_ef_search = {int(search_param['ef_search'])}") + self.conn.commit() + except mysql.errors.ProgrammingError: + log.warning( + f"Could not set loose_hnsw_ef_search = {int(search_param['ef_search'])}, " + "using server defaults" + ) + + + # prebuild SQL strings + dist_func = "L2_DISTANCE" if index_param["metric_type"] == "l2" else "COSINE_DISTANCE" + # Raw-binary VECTOR path: send float32 vectors as little-endian bytes and let the + # server consume them directly -- UNHEX(@h) on insert, the `_binary` introducer on + # query -- with no to_vector() text parse and no Python str() formatting (+71% c80 + # QPS, recall identical). `_binary ` stays constant-foldable so the HNSW + # index scan is preserved (UNHEX() is NOT, hence hex is used only on the load path). + # + # Not every MySQL-compatible build accepts the binary path, so we AUTO-PROBE it once + # per connection (see _probe_binary_support) and fall back to the to_vector() text + # path -- for BOTH insert and query -- when it is unsupported. VDB_BINARY_VEC overrides + # the probe: "1" forces binary (skip probe), "0" forces the to_vector() text path. + force = os.environ.get("VDB_BINARY_VEC") + if force == "1": + self._binary_vec = True + elif force == "0": + self._binary_vec = False + else: + self._binary_vec = self._probe_binary_support() + vec_expr = "_binary %s" if self._binary_vec else "to_vector(%s)" + self.select_sql = ( + f"SELECT id FROM {self.db_name}.{self.table_name} FORCE INDEX(idx_v) " + f"ORDER BY {dist_func}(v, {vec_expr}) LIMIT %s" + ) + self.select_sql_with_filter = ( + f"SELECT id FROM {self.db_name}.{self.table_name} FORCE INDEX(idx_v) WHERE id >= %s ORDER BY " + f"{dist_func}(v, {vec_expr}) LIMIT %s" + ) + + try: + yield + finally: + self.cursor.close() + self.admin_cursor.close() + self.conn.close() + self.cursor = None + self.admin_cursor = None + self.conn = None + + def ready_to_load(self) -> bool: + pass + + def optimize(self, data_size: int | None = None) -> None: + assert self.conn is not None, "Connection is not initialized" + assert self.admin_cursor is not None, "Admin cursor is not initialized" + + try: + log.info(f"{self.name} client create index : {self.table_name}") + self.admin_cursor.execute(f"USE {self.db_name}") + + # Build vector index attributes + index_param = self.case_config.index_param() + + # Build SECONDARY_ENGINE_ATTRIBUTE JSON; drop None values + attrs = { + "algorithm": "hnsw", + "distance": index_param.get("metric_type"), + "m": index_param.get("M"), + "ef_construction": index_param.get("ef_construction"), + "quant_algorithm": index_param.get("quant_algorithm"), + "quant_type": index_param.get("quant_type"), + } + attrs = {k: v for k, v in attrs.items() if v is not None} + attrs_json = json.dumps(attrs, ensure_ascii=False, separators=(",", ":")) + + sql = f"CREATE VECTOR INDEX idx_v ON {self.table_name}(v) SECONDARY_ENGINE_ATTRIBUTE='{attrs_json}'" + log.info(f"{self.name} client execute create index: {sql}") + self.admin_cursor.execute(sql) + self.admin_cursor.execute("COMMIT") + except Exception as e: + log.warning(f"Failed to create index on {self.table_name}, error: {e}") + raise e from None + + def insert_embeddings( + self, + embeddings: list[list[float]], + metadata: list[int], + **kwargs, + ) -> tuple[int, Exception | None]: + """Insert a batch via LOAD DATA LOCAL INFILE with sorted PK. Uses the + hex-binary ``UNHEX`` path when the server supports it (probed in + :meth:`init`), otherwise the ``to_vector()`` text path. Requires + self.init() context. + """ + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + + n = len(metadata) + if n == 0: + return 0, None + + tsv_path = Path(tempfile.gettempdir()) / ( + f"vdbb_volc_{self.db_name}_{self.table_name}_{os.getpid()}_{self._batch_counter}.tsv" + ) + self._batch_counter += 1 + + try: + _encode_batch_to_tsv(metadata, embeddings, self.dim, str(tsv_path), binary=self._binary_vec) + + tsv_literal = str(tsv_path).replace("'", "''") + set_clause = "(id, @h) SET v = UNHEX(@h)" if self._binary_vec else "(id, @v) SET v = to_vector(@v)" + load_sql = ( + f"LOAD DATA LOCAL INFILE '{tsv_literal}' " + f"INTO TABLE `{self.db_name}`.`{self.table_name}` " + "FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n' " + f"{set_clause}" + ) + self.cursor.execute(load_sql) + self.conn.commit() + except Exception as e: + log.warning( + f"Failed to LOAD DATA into Vector table ({self.table_name}), error: {e}" + ) + return 0, e + else: + actual = self.cursor.rowcount + if actual != n: + msg = f"LOAD DATA wrote {actual} rows, expected {n}" + return 0, RuntimeError(msg) + return n, None + finally: + if tsv_path.exists(): + try: + tsv_path.unlink() + except OSError as e: + log.warning(f"Failed to unlink staging TSV {tsv_path}: {e}") + + @contextmanager + def _timer(self, name: str): + """Timer context manager to measure execution time""" + start = time.time() + yield + end = time.time() + elapsed = end - start + if self.debug_mode: + log.info(f"Debug [{self.name}]: {name} took {elapsed * 1000:.2f} ms") + + def search_embedding( + self, + query: list[float], + k: int = 100, + filters: dict | None = None, + timeout: int | None = None, + retry_count: int = 3, # Number of retries for timeout errors + retry_delay: float = 1.0, # Initial delay in seconds between retries + **kwargs, + ) -> list[int]: + assert self.conn is not None, "Connection is not initialized" + assert self.cursor is not None, "Cursor is not initialized" + + try: + # Binary path: raw LE float32 bytes (C-level struct.pack, ~us) consumed + # by the server as a binary vector via `_binary %s`; avoids the per-query + # str() formatting of 1536 floats and the server-side strtof text parse. + query_param = struct.pack(f"<{len(query)}f", *query) if self._binary_vec else vector_to_string(query) + if filters: + self.cursor.execute(self.select_sql_with_filter, (filters.get("id"), query_param, k)) + else: + self.cursor.execute(self.select_sql, (query_param, k)) + return [row[0] for row in self.cursor.fetchall()] + + except mysql.Error: + log.exception("Failed to execute search query") + raise diff --git a/vectordb_bench/cli/vectordbbench.py b/vectordb_bench/cli/vectordbbench.py index 13c9687c7..aacf8979a 100644 --- a/vectordb_bench/cli/vectordbbench.py +++ b/vectordb_bench/cli/vectordbbench.py @@ -42,6 +42,7 @@ from ..backend.clients.turbopuffer.cli import TurboPuffer, TurboPufferUnpin from ..backend.clients.vectorchord.cli import VectorChordGraph, VectorChordRQ from ..backend.clients.vespa.cli import Vespa +from ..backend.clients.volc_mysql.cli import VolcMySQLHNSW from ..backend.clients.weaviate_cloud.cli import Weaviate from ..backend.clients.zilliz_cloud.cli import ZillizAutoIndex from ..backend.clients.zvec.cli import Zvec @@ -98,6 +99,7 @@ cli.add_command(PolarDBHNSWPQ) cli.add_command(PolarDBHNSWSQ) cli.add_command(SeekDBHNSW) +cli.add_command(VolcMySQLHNSW) if __name__ == "__main__":