diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..ff5b875 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,34 @@ +# This workflow will install Python dependencies, run tests and lint with a single version of Python +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python + +name: pytest + +on: + # Trigger the workflow on push to main or any pull request + push: + branches: ["main", "develop"] + pull_request: + types: ["opened", "synchronize"] + +jobs: + main: + strategy: + matrix: + platform: ["ubuntu-latest"] + python-version: ["3.9", "3.10", "3.11", "3.12"] + runs-on: ${{ matrix.platform }} + steps: + - uses: actions/checkout@v4 + - name: Install uv + uses: astral-sh/setup-uv@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Set up Python ${{ matrix.python-version }} + run: uv python install + - name: Install the project + run: | + uv sync --all-extras --dev + uv pip install faiss-cpu + - name: Test with pytest + run: | + uv run pytest diff --git a/.github/workflows/release_pypi.yaml b/.github/workflows/release_pypi.yaml new file mode 100644 index 0000000..4303afb --- /dev/null +++ b/.github/workflows/release_pypi.yaml @@ -0,0 +1,19 @@ +name: Publish Package + +on: + release: + types: [published] + +jobs: + publish: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install uv + uses: astral-sh/setup-uv@v4 + with: + python-version: "3.9" + - name: Build the package + run: uv build + - name: Publish the package + run: uv publish --token ${{ secrets.PYPI_TOKEN }} diff --git a/.gitignore b/.gitignore index b6e4761..7eeefed 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,4 @@ dmypy.json # Pyre type checker .pyre/ +uv.lock \ No newline at end of file diff --git a/README.rst b/README.rst index 6ceaaf3..c9fd952 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,15 @@ SEMSIS: Semantic Similarity Search ################################## +|PyPI version| |PyPI license| |CI| + +.. |PyPI version| image:: https://img.shields.io/pypi/v/semsis.svg + :target: https://pypi.python.org/pypi/semsis +.. |PyPI license| image:: https://img.shields.io/pypi/l/semsis.svg + :target: https://pypi.python.org/pypi/semsis +.. |CI| image:: https://github.com/de9uch1/semsis/actions/workflows/ci.yaml/badge.svg + :target: https://github.com/de9uch1/semsis + *Semsis* is a library for semantic similarity search. It is designed to focus on the following goals: @@ -19,12 +28,28 @@ REQUIREMENTS INSTALLATION ============ +via pip: + +.. code:: bash + + pip install semsis + +from the source: + .. code:: bash git clone https://github.com/de9uch1/semsis.git cd semsis/ pip install ./ +from the source with uv: + +.. code:: bash + + git clone https://github.com/de9uch1/semsis.git + cd semsis/ + uv sync + USAGE ===== @@ -34,7 +59,7 @@ Case 1: Use semsis as API You can see the example of text search in `end2end_test.py <./tests/end2end_test.py>`_. Note that this example is not optimized for billion-scale index construction. -If you find the efficient implementation, please see `semsis/cli/README.rst <./semsis/cli/README.rst>`_. +If you find the efficient implementation, please see `src/semsis/cli/README.rst <./src/semsis/cli/README.rst>`_. 1. Encode the sentences and store in a key--value store. @@ -104,7 +129,7 @@ Case 2: Use semsis as command line scripts ------------------------------------------ Command line scripts are carefully designed to run efficiently for the billion-scale search. -See `semsis/cli/README.rst <./semsis/cli/README.rst>`_. +See `src/semsis/cli/README.rst <./src/semsis/cli/README.rst>`_. LICENSE diff --git a/pyproject.toml b/pyproject.toml index 8f3a1c7..03180e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,34 +1,38 @@ -[tool.poetry] +[project] name = "semsis" -version = "0.1.2" +version = "0.1.3" description = "A libary for semantic similarity search" -authors = ["Hiroyuki Deguchi "] -license = "MIT" readme = "README.rst" -packages = [{include = "semsis"}] +authors = [ + { name = "Hiroyuki Deguchi", email = "deguchi.hiroyuki@nict.go.jp" } +] +requires-python = ">=3.9" +dependencies = [ + "h5py>=3.12.1", + "sentence-transformers>=3.3.1", + "simple-parsing>=0.1.7", + "torch>=2.0.0", + "transformers>=4.47.1", +] -[tool.poetry.dependencies] -python = "^3.8.1" -torch = "^2.0.0" -transformers = "^4.33.2" -sentence-transformers = "^2.2.2" -h5py = "^3.9.0" +[project.scripts] +semsis = "semsis:main" +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[dependency-groups] +dev = [ + "mypy>=1.14.1", + "pre-commit>=4.0.1", + "pytest>=8.3.4", + "pytest-cov>=6.0.0", + "ruff>=0.8.6", +] -[tool.poetry.group.dev.dependencies] -black = "^23.9.1" -isort = "^5.12.0" -flake8 = "^6.1.0" -mypy = "^1.5.1" -pytest = "^7.4.2" -pytest-cov = "^4.1.0" +[tool.ruff.lint] +extend-select = ["I"] [tool.coverage.run] omit = ["*_test.py"] - -[tool.isort] -profile = "black" - -[build-system] -requires = ["poetry-core"] -build-backend = "poetry.core.masonry.api" diff --git a/semsis/cli/build_retriever.py b/semsis/cli/build_retriever.py deleted file mode 100644 index 380777c..0000000 --- a/semsis/cli/build_retriever.py +++ /dev/null @@ -1,175 +0,0 @@ -#!/usr/bin/env python3 -import contextlib -import logging -import math -import os -import sys -from argparse import ArgumentParser, Namespace - -import numpy as np -import torch -from tqdm import tqdm - -from semsis.kvstore import KVStore -from semsis.retriever import Retriever -from semsis.utils import Stopwatch - -logging.basicConfig( - format="| %(asctime)s | %(levelname)s | %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - level="INFO", - stream=sys.stdout, -) -logger = logging.getLogger("semsis.cli.build_retriever") - - -def parse_args() -> Namespace: - parser = ArgumentParser() - # fmt: off - parser.add_argument("--kvstore", metavar="FILE", nargs="+", required=True, - help="Path to key--value store files.") - parser.add_argument("--index-path", metavar="FILE", required=True, - help="Path to an index file.") - parser.add_argument("--trained-index-path", metavar="FILE", - help="Path to a trained index file. If this option is not specified, " - "the final index path will be set.") - parser.add_argument("--config-path", metavar="FILE", required=True, - help="Path to a configuration file.") - parser.add_argument("--cpu", action="store_true", - help="Only use CPU.") - parser.add_argument("--fp16", action="store_true", - help="Use FP16.") - parser.add_argument("--backend", metavar="NAME", type=str, default="faiss-cpu", - help="Backend of the search engine.") - parser.add_argument("--metric", metavar="TYPE", default="l2", choices=["l2", "ip", "cos"], - help="Distance function.") - parser.add_argument("--chunk-size", metavar="N", type=int, default=1000000, - help="The number of data to be loaded at a time.") - parser.add_argument("--train-size", metavar="N", type=int, default=1000000, - help="The number of training data.") - parser.add_argument("--hnsw-nlinks", metavar="N", type=int, default=0, - help="[HNSW] The number of links per node.") - parser.add_argument("--ivf-nlists", metavar="N", type=int, default=0, - help="[IVF] The number of centroids") - parser.add_argument("--pq-nblocks", metavar="N", type=int, default=0, - help="[PQ] The number of sub-vectors") - parser.add_argument("--opq", action="store_true", - help="Use OPQ to minimize the quantization error.") - parser.add_argument("--pca", action="store_true", - help="Use PCA to reduce the dimension size.") - parser.add_argument("--pca-dim", metavar="N", type=int, default=-1, - help="The dimension size which is reduced by PCA.") - parser.add_argument("--append-sequential", action="store_true", - help="Append entries from the tail.") - parser.add_argument("--save-checkpoint", action="store_true", - help="Save checkpoint after adding each key--value set.") - # fmt: on - return parser.parse_args() - - -def train_retriever( - args: Namespace, training_vectors: np.ndarray, use_gpu: bool = False -) -> Retriever: - train_size, dim = training_vectors.shape - retriever_type = Retriever.get_cls(args.backend) - dim = training_vectors.shape[1] - cfg_dc = retriever_type.Config - cfg_dict = {"dim": dim, "backend": args.backend, "metric": args.metric} - - def set_if_hasattr(name: str): - if hasattr(cfg_dc, name): - cfg_dict[name] = getattr(args, name) - - set_if_hasattr("hnsw_nlinks") - set_if_hasattr("ivf_nlists") - set_if_hasattr("pq_nblocks") - set_if_hasattr("opq") - set_if_hasattr("pca") - set_if_hasattr("pca_dim") - set_if_hasattr("fp16") - cfg = retriever_type.Config(**cfg_dict) - logger.info(cfg) - logger.info(f"Input vector: {cfg.dim} dimension") - - if getattr(args, "trained_index_path", None): - trained_index_path = args.trained_index_path - else: - trained_index_path = args.index_path - - if os.path.exists(trained_index_path): - trained_retriever = retriever_type.load(trained_index_path, args.config_path) - if trained_retriever.cfg == cfg: - logger.info(f"Load trained index: {trained_index_path}") - return trained_retriever - raise FileExistsError(trained_index_path) - - retriever = retriever_type.build(cfg) - if use_gpu: - retriever.to_gpu_train() - - timer = Stopwatch() - logger.info(f"Train a retriever from {train_size:,} datapoints.") - with timer.measure(): - retriever.train(training_vectors) - logger.info(f"Training done in {timer.total:.1f} seconds") - retriever.save(trained_index_path, args.config_path) - return retriever_type.load(trained_index_path, args.config_path) - - -def main(args: Namespace) -> None: - logger.info(args) - - timer = Stopwatch() - use_gpu = torch.cuda.is_available() and not args.cpu - chunk_size = args.chunk_size - with contextlib.ExitStack() as stack: - kvstores = [ - stack.enter_context(KVStore.open(fname, mode="r")) for fname in args.kvstore - ] - training_vectors = np.concatenate( - [ - kvstore.key[: min(args.train_size // len(kvstores), len(kvstore))] - for kvstore in kvstores - ] - ) - retriever = train_retriever(args, training_vectors, use_gpu=use_gpu) - if use_gpu: - retriever.to_gpu_add() - - logger.info(f"Build a retriever in {args.index_path}") - for kvstore in kvstores: - logger.info(f"Add {len(kvstore):,} vectors from {kvstore.filename}") - offset = len(retriever) if args.append_sequential else 0 - with timer.measure(): - for i in tqdm( - range(math.ceil(len(kvstore) / chunk_size)), - desc="Building a retriever", - ): - start_idx = i * chunk_size - end_idx = min(start_idx + chunk_size, len(kvstore)) - num_added = end_idx - start_idx - logger.info(f"Add vectors: {num_added:,} / {len(kvstore):,}") - retriever.add( - kvstore.key[start_idx:end_idx], - kvstore.value[start_idx:end_idx] + offset, - ) - logger.info(f"Retriever index size: {len(retriever):,}") - - if args.save_checkpoint: - retriever.save(args.index_path, args.config_path) - - if not args.save_checkpoint: - retriever.save(args.index_path, args.config_path) - - logger.info(f"Added {len(retriever):,} datapoints") - logger.info(f"Retriever index size: {len(retriever):,}") - logger.info(f"Built the retriever in {timer.total:.1f} seconds") - - -def cli_main() -> None: - args = parse_args() - main(args) - - -if __name__ == "__main__": - cli_main() diff --git a/semsis/cli/query_interactive.py b/semsis/cli/query_interactive.py deleted file mode 100644 index a2b9694..0000000 --- a/semsis/cli/query_interactive.py +++ /dev/null @@ -1,161 +0,0 @@ -#!/usr/bin/env python3 -import fileinput -import logging -import sys -from argparse import ArgumentParser, Namespace -from collections import defaultdict -from os import PathLike -from typing import Generator, List - -import torch - -from semsis.encoder import SentenceEncoder -from semsis.registry import get_registry -from semsis.retriever import load_backend_from_config -from semsis.utils import Stopwatch - -logging.basicConfig( - format="| %(asctime)s | %(levelname)s | %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - level="INFO", - stream=sys.stdout, -) -logger = logging.getLogger("semsis.cli.query_interactive") - - -def buffer_lines( - input: PathLike, buffer_size: int = 1 -) -> Generator[List[str], None, None]: - buf: List[str] = [] - with fileinput.input( - [input], mode="r", openhook=fileinput.hook_encoded("utf-8") - ) as f: - for line in f: - buf.append(line.strip()) - if len(buf) >= buffer_size: - yield buf - buf = [] - if len(buf) > 0: - yield buf - - -def parse_args() -> Namespace: - """Parses the command line arguments. - - Returns: - Namespace: Command line arguments. - """ - parser = ArgumentParser() - # fmt: off - parser.add_argument("--input", type=str, default="-", - help="Input file.") - parser.add_argument("--index-path", metavar="FILE", - help="Path to an index file.") - parser.add_argument("--config-path", metavar="FILE", required=True, - help="Path to a configuration file.") - parser.add_argument("--model", type=str, default="sentence-transformers/LaBSE", - help="Model name") - parser.add_argument("--representation", type=str, default="sbert", - choices=get_registry("sentence_encoder").keys(), - help="Sentence representation type.") - parser.add_argument("--gpu-encode", action="store_true", - help="Transfer the encoder to GPUs.") - parser.add_argument("--gpu-retrieve", action="store_true", - help="Transfer the retriever to GPUs.") - parser.add_argument("--fp16", action="store_true", - help="Use FP16.") - parser.add_argument("--ntrials", type=int, default=1, - help="Number of trials to measure the search time.") - parser.add_argument("--topk", type=int, default=1, - help="Search the top-k nearest neighbor.") - parser.add_argument("--buffer-size", type=int, default=1, - help="Buffer size to query at a time.") - parser.add_argument("--msec", action="store_true", - help="Show the search time in milli seconds instead of seconds.") - parser.add_argument("--efsearch", type=int, default=16, - help="Set the efSearch parameter for the HNSW indexes. " - "This corresponds to the beam width at the search time.") - parser.add_argument("--nprobe", type=int, default=8, - help="Set the nprobe parameter for the IVF indexes. " - "This corresponds to the number of neighboring clusters to be searched.") - # fmt: on - return parser.parse_args() - - -def main(args: Namespace) -> None: - logger.info(args) - - encoder = SentenceEncoder.build(args.model, args.representation) - if torch.cuda.is_available() and args.gpu_encode: - encoder = encoder.cuda() - if args.fp16: - encoder = encoder.half() - logger.info(f"The encoder is on the GPU.") - - retriever_type = load_backend_from_config(args.config_path) - retriever = retriever_type.load(args.index_path, args.config_path) - retriever.set_efsearch(args.efsearch) - retriever.set_nprobe(args.nprobe) - if torch.cuda.is_available() and args.gpu_retrieve: - retriever.to_gpu_search() - logger.info(f"Retriever configuration: {retriever.cfg}") - logger.info(f"Retriever index size: {len(retriever):,}") - - timers = defaultdict(Stopwatch) - ntrials = args.ntrials - start_id = 0 - nqueryed = 0 - acctimes = defaultdict(float) - unit = "msec" if args.msec else "sec" - for lines in buffer_lines(args.input, buffer_size=args.buffer_size): - timers["encode"].reset() - timers["retrieve"].reset() - for _ in range(ntrials): - with timers["encode"].measure(): - querys = encoder.encode(lines).cpu().numpy() - with timers["retrieve"].measure(): - dists, idxs = retriever.search(querys, k=args.topk) - - for i, line in enumerate(lines): - uid = start_id + i - dist_str = " ".join([f"{x:.3f}" for x in dists[i].tolist()]) - idx_str = " ".join([str(x) for x in idxs[i].tolist()]) - print(f"Q-{uid}\t{line}") - print(f"D-{uid}\t{dist_str}") - print(f"I-{uid}\t{idx_str}") - - times = {k: timer.total for k, timer in timers.items()} - times["search"] = sum([timer.total for timer in timers.values()]) - if args.msec: - times = {k: t * 1000 for k, t in times.items()} - for name, t in times.items(): - acctimes[name] += t - - nqueryed = start_id + len(lines) - for name, c in [("encode", "E"), ("retrieve", "R"), ("search", "S")]: - t = times[name] - at = times[name] / ntrials - print(f"T{c}-{start_id}:{nqueryed}\t{t:.1f} {unit}") - print(f"AT{c}-{start_id}:{nqueryed}\t{at:.1f} {unit}") - - start_id = nqueryed - - batch_avgtimes = defaultdict(float) - single_avgtimes = defaultdict(float) - for k, v in acctimes.items(): - batch_avgtimes[k] = v / ntrials - single_avgtimes[k] = v / (nqueryed * ntrials) - - for k in acctimes.keys(): - print(f"Total {k} time: {acctimes[k]:.1f} {unit}") - print(f"Average {k} time per batch: {batch_avgtimes[k]:.1f} {unit}") - print(f"Average {k} time per single query: {single_avgtimes[k]:.1f} {unit}") - - -def cli_main() -> None: - args = parse_args() - main(args) - - -if __name__ == "__main__": - cli_main() diff --git a/semsis/__init__.py b/src/semsis/__init__.py similarity index 100% rename from semsis/__init__.py rename to src/semsis/__init__.py diff --git a/semsis/cli/README.rst b/src/semsis/cli/README.rst similarity index 100% rename from semsis/cli/README.rst rename to src/semsis/cli/README.rst diff --git a/src/semsis/cli/build_retriever.py b/src/semsis/cli/build_retriever.py new file mode 100644 index 0000000..c39eec9 --- /dev/null +++ b/src/semsis/cli/build_retriever.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +import contextlib +import logging +import math +import os +import sys +from dataclasses import asdict, dataclass +from typing import Optional + +import numpy as np +import simple_parsing +import torch +from simple_parsing import field +from tqdm import tqdm + +from semsis.kvstore import KVStore +from semsis.retriever import Metric, Retriever +from semsis.retriever.base import RetrieverParam +from semsis.typing import NDArrayFloat +from semsis.utils import Stopwatch + +logging.basicConfig( + format="| %(asctime)s | %(levelname)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level="INFO", + stream=sys.stderr, +) +logger = logging.getLogger("semsis.cli.build_retriever") + + +@dataclass +class Config: + """Configuraiton for build_retriever""" + + # Path to an index file. + index_path: str = field() + # Path to an index configuration file. + config_path: str = field() + # Path to key--value store files. + kvstore: list[str] = field(nargs="+") + + # Path to a trained index file. + # If this option is not specified, the index path will be set. + trained_index_path: Optional[str] = None + # Only use CPU. + cpu: bool = False + # Backend of the search engine. + backend: str = "faiss-cpu" + # Distance function. + metric: Metric = Metric.l2 + # The number of data to be loaded at a time. + chunk_size: int = 1000000 + # The number of training data of the index. + train_size: int = 1000000 + # Append entries to the tail. + append_sequential: bool = False + # Save checkpoint after adding each key--value set. + save_checkpoint: bool = False + + +def parse_args() -> tuple[Config, RetrieverParam]: + parser = simple_parsing.ArgumentParser() + parser.add_arguments(Config, "common") + parser.add_arguments(RetrieverParam, "retriever_param") + args = parser.parse_args() + return args.common, args.retriever_param + + +def train_retriever( + args: Config, + retriever_param: RetrieverParam, + training_vectors: NDArrayFloat, + use_gpu: bool = False, +) -> Retriever: + train_size, dim = training_vectors.shape + retriever_type = Retriever.get_cls(args.backend) + dim = training_vectors.shape[1] + + cfg = retriever_type.Config( + dim=dim, + backend=args.backend, + metric=args.metric, + **{ + k: v + for k, v in asdict(retriever_param).items() + if hasattr(retriever_type.Config, k) + }, + ) + logger.info(cfg) + logger.info(f"Input vector: {cfg.dim} dimension") + + trained_index_path = args.trained_index_path or args.index_path + if os.path.exists(trained_index_path): + trained_retriever = retriever_type.load(trained_index_path, args.config_path) + if trained_retriever.cfg == cfg: + logger.info(f"Load trained index: {trained_index_path}") + return trained_retriever + raise FileExistsError(trained_index_path) + + retriever = retriever_type.build(cfg) + if use_gpu: + retriever.to_gpu_train() + + timer = Stopwatch() + logger.info(f"Train a retriever from {train_size:,} datapoints.") + with timer.measure(): + retriever.train(training_vectors) + logger.info(f"Training done in {timer.total:.1f} seconds") + retriever.save(trained_index_path, args.config_path) + return retriever_type.load(trained_index_path, args.config_path) + + +def main(args: Config, retriever_param: RetrieverParam) -> None: + logger.info(args) + + timer = Stopwatch() + use_gpu = torch.cuda.is_available() and not args.cpu + chunk_size = args.chunk_size + with contextlib.ExitStack() as stack: + kvstores = [ + stack.enter_context(KVStore.open(fname, mode="r")) for fname in args.kvstore + ] + training_vectors: NDArrayFloat = np.concatenate( + [ + kvstore.key[: min(args.train_size // len(kvstores), len(kvstore))] + for kvstore in kvstores + ] + ) + retriever = train_retriever( + args, retriever_param, training_vectors, use_gpu=use_gpu + ) + if use_gpu: + retriever.to_gpu_add() + + logger.info(f"Build a retriever in {args.index_path}") + for kvstore in kvstores: + logger.info(f"Add {len(kvstore):,} vectors from {kvstore.filename}") + offset = len(retriever) if args.append_sequential else 0 + with timer.measure(): + for i in tqdm( + range(math.ceil(len(kvstore) / chunk_size)), + desc="Building a retriever", + ): + start_idx = i * chunk_size + end_idx = min(start_idx + chunk_size, len(kvstore)) + num_added = end_idx - start_idx + logger.info(f"Add vectors: {num_added:,} / {len(kvstore):,}") + retriever.add( + kvstore.key[start_idx:end_idx], + kvstore.value[start_idx:end_idx] + offset, + ) + logger.info(f"Retriever index size: {len(retriever):,}") + + if args.save_checkpoint: + retriever.save(args.index_path, args.config_path) + + if not args.save_checkpoint: + retriever.save(args.index_path, args.config_path) + + logger.info(f"Added {len(retriever):,} datapoints") + logger.info(f"Retriever index size: {len(retriever):,}") + logger.info(f"Built the retriever in {timer.total:.1f} seconds") + + +def cli_main() -> None: + args, retriever_param = parse_args() + main(args, retriever_param) + + +if __name__ == "__main__": + cli_main() diff --git a/src/semsis/cli/query_interactive.py b/src/semsis/cli/query_interactive.py new file mode 100644 index 0000000..5f60fc6 --- /dev/null +++ b/src/semsis/cli/query_interactive.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +import argparse +import enum +import fileinput +import json +import logging +import sys +from collections import defaultdict +from dataclasses import dataclass +from typing import Generator + +import simple_parsing +import torch +from simple_parsing.helpers.fields import choice, field + +from semsis.encoder import SentenceEncoder +from semsis.registry import get_registry +from semsis.retriever import load_backend_from_config +from semsis.typing import StrPath +from semsis.utils import Stopwatch + +logging.basicConfig( + format="| %(asctime)s | %(levelname)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level="INFO", + stream=sys.stderr, +) +logger = logging.getLogger("semsis.cli.query_interactive") + + +def buffer_lines( + input: StrPath, buffer_size: int = 1, prefix_string: str = "" +) -> Generator[list[str], None, None]: + buf: list[str] = [] + with fileinput.input( + [input], mode="r", openhook=fileinput.hook_encoded("utf-8") + ) as f: + for line in f: + buf.append(prefix_string + line.strip()) + if len(buf) >= buffer_size: + yield buf + buf = [] + if len(buf) > 0: + yield buf + + +class Format(str, enum.Enum): + plain = "plain" + json = "json" + + +@dataclass +class Config: + """Configuration for query_interactive""" + + # Path to an index file. + index_path: str + # Path to an index configuration file. + config_path: str + + # Path to an input file. If not specified, read from the standard input. + input: str = "-" + # Path to an output file. + output: argparse.FileType("w") = field(default="-") + # Output format. + format: Format = Format.plain + # Model name. + model: str = "sentence-transformers/LaBSE" + # Type of sentence representation. + representation: str = choice( + *get_registry("sentence_encoder").keys(), default="sbert" + ) + + # Use fp16. + fp16: bool = False + # Buffer size to query at a time. + buffer_size: int = 128 + # Transfer the encoder to GPUs. + gpu_encode: bool = False + # Transfer the retriever to GPUs. + gpu_retrieve: bool = False + # Number of trials to measure the search time. + ntrials: int = 1 + # Search the top-k nearest neighbor. + topk: int = 1 + # Show the search time in milli seconds instead of seconds. + msec: bool = False + # Set the efSearch parameter for the HNSW indexes. + # This corresponds to the beam width at the search time. + efsearch: int = 16 + # Set the nprobe parameter for the IVF indexes. + # This corresponds to the number of neighboring clusters to be searched. + nprobe: int = 8 + # Prefix string. + # This option is useful for `intfloat/e5-large`. + prefix_string: str = "" + + +def main(args: Config) -> None: + logger.info(args) + + encoder = SentenceEncoder.build(args.model, args.representation) + if torch.cuda.is_available() and args.gpu_encode: + if args.fp16: + encoder = encoder.half() + encoder = encoder.cuda() + logger.info("The encoder is on the GPU.") + + retriever_type = load_backend_from_config(args.config_path) + retriever = retriever_type.load(args.index_path, args.config_path) + retriever.set_efsearch(args.efsearch) + retriever.set_nprobe(args.nprobe) + if torch.cuda.is_available() and args.gpu_retrieve: + retriever.to_gpu_search() + logger.info(f"Retriever configuration: {retriever.cfg}") + logger.info(f"Retriever index size: {len(retriever):,}") + + def _print(string: str): + print(string, file=args.output) + + timers = defaultdict(Stopwatch) + ntrials = args.ntrials + start_id = 0 + nqueryed = 0 + acctimes = defaultdict(float) + unit = "msec" if args.msec else "sec" + for lines in buffer_lines( + args.input, buffer_size=args.buffer_size, prefix_string=args.prefix_string + ): + timers["encode"].reset() + timers["retrieve"].reset() + + with timers["encode"].measure(): + querys = encoder.encode(lines).cpu().numpy() + with timers["retrieve"].measure(): + dists, idxs = retriever.search(querys, k=args.topk) + for _ in range(ntrials - 1): + with timers["encode"].measure(): + querys = encoder.encode(lines).cpu().numpy() + with timers["retrieve"].measure(): + dists, idxs = retriever.search(querys, k=args.topk) + + for i, line in enumerate(lines): + uid = start_id + i + + # python<=3.9 does not support the match statement. + if args.format == Format.plain: + dist_str = " ".join([f"{x:.3f}" for x in dists[i].tolist()]) + idx_str = " ".join([str(x) for x in idxs[i].tolist()]) + _print(f"Q-{uid}\t{line}") + _print(f"D-{uid}\t{dist_str}") + _print(f"I-{uid}\t{idx_str}") + elif args.format == Format.json: + res = { + "i": uid, + "query": line, + "results": [ + {"rank": rank, "distance": distance, "idx": idx} + for rank, (distance, idx) in enumerate( + zip(dists[i].tolist(), idxs[i].tolist()) + ) + ], + } + _print(json.dumps(res, ensure_ascii=False)) + + times = {k: timer.total for k, timer in timers.items()} + times["search"] = sum([timer.total for timer in timers.values()]) + if args.msec: + times = {k: t * 1000 for k, t in times.items()} + for name, t in times.items(): + acctimes[name] += t + + nqueryed = start_id + len(lines) + for name, c in [("encode", "E"), ("retrieve", "R"), ("search", "S")]: + t = times[name] + at = times[name] / ntrials + logger.info(f"T{c}-{start_id}:{nqueryed}\t{t:.1f} {unit}") + logger.info(f"AT{c}-{start_id}:{nqueryed}\t{at:.1f} {unit}") + + start_id = nqueryed + + batch_avgtimes = defaultdict(float) + single_avgtimes = defaultdict(float) + for k, v in acctimes.items(): + batch_avgtimes[k] = v / ntrials + single_avgtimes[k] = v / (nqueryed * ntrials) + + for k in acctimes.keys(): + logger.info(f"Total {k} time: {acctimes[k]:.1f} {unit}") + logger.info(f"Average {k} time per batch: {batch_avgtimes[k]:.1f} {unit}") + logger.info( + f"Average {k} time per single query: {single_avgtimes[k]:.1f} {unit}" + ) + + +def cli_main() -> None: + args = simple_parsing.parse(Config) + main(args) + + +if __name__ == "__main__": + cli_main() diff --git a/semsis/cli/store_kv.py b/src/semsis/cli/store_kv.py similarity index 55% rename from semsis/cli/store_kv.py rename to src/semsis/cli/store_kv.py index 18beb22..c7f727d 100644 --- a/semsis/cli/store_kv.py +++ b/src/semsis/cli/store_kv.py @@ -5,13 +5,13 @@ import math import signal import sys -from argparse import ArgumentParser, Namespace from dataclasses import dataclass -from os import PathLike -from typing import Generator, List +from typing import Generator import numpy as np +import simple_parsing import torch +from simple_parsing.helpers.fields import choice from tqdm import tqdm from transformers import BatchEncoding @@ -19,13 +19,14 @@ from semsis.encoder.tokenizer import Tokenizer from semsis.kvstore import KVStore from semsis.registry import get_registry +from semsis.typing import NDArrayFloat, NDArrayI64, StrPath from semsis.utils import Stopwatch logging.basicConfig( format="| %(asctime)s | %(levelname)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", level="INFO", - stream=sys.stdout, + stream=sys.stderr, ) logger = logging.getLogger("semsis.cli.store_kv") @@ -35,23 +36,23 @@ class Batch: """Mini-batch class. inputs (BatchEncoding): Model inputs. - ids (np.ndarray): Sample IDs. + ids (NDArrayI64): Sample IDs. """ inputs: BatchEncoding - ids: np.ndarray + ids: NDArrayI64 @dataclass class Dataset: """Dataset class. - sequences (List[List[int]]): Token ID sequences. - lengths (np.ndarray): Lengths of each sequence. + sequences (list[list[int]]): Token ID sequences. + lengths (NDArrayI64): Lengths of each sequence. """ - sequences: List[List[int]] - lengths: np.ndarray + sequences: list[list[int]] + lengths: NDArrayI64 def __len__(self): return len(self.lengths) @@ -87,79 +88,115 @@ def set_pdeathsig(sig: int) -> None: libc.prctl(PR_SET_PDEATHSIG, sig) +def read_lines(file: StrPath, prefix_string: str = "") -> Generator[str, None, None]: + """Read lines. + + Args: + file (StrPath): Input file. + prefix_string (str): Prefix string to be added for each line. + """ + with open(file, mode="r") as f: + for line in f: + yield prefix_string + line.strip() + + def prepare_dataset( - file: PathLike, + file: StrPath, tokenizer: Tokenizer, num_workers: int = 1, chunk_size: int = 100000, + prefix_string: str = "", ) -> Dataset: """Prepare a dataset. Args: - file (os.PathLike): Input file. + file (StrPath): Input file. tokenizer (Tokenizer): Tokenizer. num_workers (int, optional): Number of workers. chunk_size (int): Size of data processed by each process at a time. + prefix_string (str): Prefix string. Returns: Dataset: A dataset dataclass. """ - with open(file, mode="r") as f: + sequences: list[list[int]] = [] + if tokenizer.is_fast: + chunk = [] + for line in tqdm( + read_lines(file, prefix_string=prefix_string), + desc="Preprocess the data", + mininterval=1, + ): + chunk.append(line) + if len(chunk) >= chunk_size: + sequences += tokenizer.tokenize_batch(chunk) + chunk = [] + if len(chunk) > 0: + sequences += tokenizer.tokenize_batch(chunk) + else: with concurrent.futures.ProcessPoolExecutor( max_workers=num_workers, initializer=set_pdeathsig, initargs=(signal.SIGINT,), ) as executor: - sequences = list( - tqdm( - executor.map(tokenizer.tokenize, f, chunksize=chunk_size), - desc="Preprocess the data", - mininterval=1, - ) - ) + for tokenized_chunk in tqdm( + executor.map( + tokenizer.tokenize, + read_lines(file, prefix_string=prefix_string), + chunksize=chunk_size, + ), + desc="Preprocess the data", + mininterval=1, + ): + sequences.append(tokenized_chunk) return Dataset(sequences, np.array([len(seq) for seq in sequences])) -def parse_args() -> Namespace: - """Parses the command line arguments. - - Returns: - Namespace: Command line arguments. - """ - parser = ArgumentParser() - # fmt: off - parser.add_argument("--input", type=str, required=True, - help="Input file.") - parser.add_argument("--output", type=str, default="kv.bin", - help="Path to the key--value store.") - parser.add_argument("--model", type=str, default="sentence-transformers/LaBSE", - help="Model name") - parser.add_argument("--representation", type=str, default="sbert", - choices=get_registry("sentence_encoder").keys(), - help="Sentence representation type.") - parser.add_argument("--batch-size", type=int, default=128, - help="Batch size.") - parser.add_argument("--fp16", action="store_true", - help="Use FP16.") - parser.add_argument("--workers", type=int, default=16, - help="Number of workers to preprocess the data.") - parser.add_argument("--chunk-size", type=int, default=100000, - help="Chunk size for multi-processing.") - # fmt: on - return parser.parse_args() - - -def main(args: Namespace) -> None: +@dataclass +class Config: + """Configuration for store_kv""" + + # Path to an input file. + input: str + # Path to the key-value store file. + output: str = "kv.bin" + # Model name. + model: str = "sentence-transformers/LaBSE" + # Type of sentence representation. + representation: str = choice( + *get_registry("sentence_encoder").keys(), default="sbert" + ) + # Use fp16. + fp16: bool = False + # Batch size. + batch_size: int = 128 + # Number of workers for preprocessing the data. + workers: int = 16 + # Chunk size for multi-processing. + chunk_size: int = 100000 + + # Prefix string. + # This option is useful for `intfloat/e5-large`. + prefix_string: str = "" + + +def main(args: Config) -> None: logger.info(args) timer = Stopwatch() encoder = SentenceEncoder.build(args.model, args.representation) tokenizer = encoder.tokenizer - logger.info(f"Start preprocessing the data") + logger.info("Start preprocessing the data") with timer.measure(): - dataset = prepare_dataset(args.input, tokenizer, args.workers, args.chunk_size) + dataset = prepare_dataset( + args.input, + tokenizer, + args.workers, + args.chunk_size, + prefix_string=args.prefix_string, + ) logger.info(f"Dataset size: {len(dataset):,}") logger.info(f"Preprocessed the data in {timer.total:.1f} seconds.") @@ -168,14 +205,14 @@ def main(args: Namespace) -> None: if args.fp16: encoder = encoder.half() - logger.info(f"Start storing the keys and values.") + logger.info("Start storing the keys and values.") with KVStore.open(args.output, mode="w") as kvstore: kvstore.new(encoder.get_embed_dim(), np.float16 if args.fp16 else np.float32) timer.reset() with timer.measure(): for batch in dataset.yield_batches(tokenizer, args.batch_size): - sentence_vectors = ( + sentence_vectors: NDArrayFloat = ( encoder(batch.inputs.to(encoder.device)).cpu().numpy() ) kvstore.add(sentence_vectors, batch.ids) @@ -183,7 +220,7 @@ def main(args: Namespace) -> None: def cli_main() -> None: - args = parse_args() + args = simple_parsing.parse(Config) main(args) diff --git a/semsis/encoder/__init__.py b/src/semsis/encoder/__init__.py similarity index 100% rename from semsis/encoder/__init__.py rename to src/semsis/encoder/__init__.py diff --git a/semsis/encoder/sentence_encoder.py b/src/semsis/encoder/sentence_encoder.py similarity index 93% rename from semsis/encoder/sentence_encoder.py rename to src/semsis/encoder/sentence_encoder.py index 5b93f82..3e4725d 100644 --- a/semsis/encoder/sentence_encoder.py +++ b/src/semsis/encoder/sentence_encoder.py @@ -1,5 +1,6 @@ +from __future__ import annotations + import abc -from typing import Dict, List import torch import torch.nn as nn @@ -60,11 +61,11 @@ def build(cls, model_name_or_path: str, representation: str) -> "SentenceEncoder """ return get_cls(representation)(model_name_or_path) - def encode(self, sentences: List[str]) -> Tensor: + def encode(self, sentences: list[str]) -> Tensor: """Encode sentences into their sentence vectors. Args: - sentences (List[str]): Input sentences. + sentences (list[str]): Input sentences. Returns: Tensor: Sentence vectors of shape `(batch_size, embed_dim)`. @@ -95,7 +96,7 @@ def get_embed_dim(self) -> int: @register("avg") class SentenceEncoderAvg(SentenceEncoder): - def forward(self, net_inputs: Dict[str, Tensor]) -> Tensor: + def forward(self, net_inputs: dict[str, Tensor]) -> Tensor: """Return the feature vectors of the given inputs. Args: @@ -112,7 +113,7 @@ def forward(self, net_inputs: Dict[str, Tensor]) -> Tensor: @register("cls") class SentenceEncoderCls(SentenceEncoder): - def forward(self, net_inputs: Dict[str, Tensor]) -> Tensor: + def forward(self, net_inputs: dict[str, Tensor]) -> Tensor: """Return the feature vectors of the given inputs. Args: @@ -136,7 +137,7 @@ def load_model(self, name_or_path: str) -> None: self.model = SentenceTransformer(name_or_path) self.tokenizer = Tokenizer(self.model.tokenizer) - def forward(self, net_inputs: Dict[str, Tensor]) -> Tensor: + def forward(self, net_inputs: dict[str, Tensor]) -> Tensor: """Return the feature vectors of the given inputs. Args: diff --git a/semsis/encoder/sentence_encoder_test.py b/src/semsis/encoder/sentence_encoder_test.py similarity index 100% rename from semsis/encoder/sentence_encoder_test.py rename to src/semsis/encoder/sentence_encoder_test.py diff --git a/semsis/encoder/tokenizer.py b/src/semsis/encoder/tokenizer.py similarity index 60% rename from semsis/encoder/tokenizer.py rename to src/semsis/encoder/tokenizer.py index c3b3ea8..229f2e9 100644 --- a/semsis/encoder/tokenizer.py +++ b/src/semsis/encoder/tokenizer.py @@ -1,4 +1,6 @@ -from typing import List, Union +from __future__ import annotations + +from typing import Union from transformers import ( AutoTokenizer, @@ -14,8 +16,13 @@ def __init__( ) -> None: self.tokenizer = tokenizer + @property + def is_fast(self) -> bool: + """Whether the tokenizer is `PreTrainedTokenizerFast` or not.""" + return self.tokenizer.is_fast + @classmethod - def build(cls, model_name_or_path: str) -> "Tokenizer": + def build(cls, model_name_or_path: str) -> Tokenizer: """Build the tokenizer. Args: @@ -29,24 +36,43 @@ def build(cls, model_name_or_path: str) -> "Tokenizer": def __call__(self, *args, **kwargs): return self.tokenizer(*args, **kwargs) - def tokenize(self, sentence: str) -> List[int]: + def tokenize(self, sentence: str) -> list[int]: """Tokenize and convert a sentence to the token sequence. Args: sentence (str): An input sentence. Returns: - List[int]: A token ID sequence. + list[int]: A token ID sequence. """ return self.tokenizer.encode( sentence, add_special_tokens=False, truncation=True ) - def collate(self, samples: List[List[int]]) -> BatchEncoding: + def tokenize_batch(self, sentences: list[str]) -> list[list[int]]: + """Tokenize and convert sentences to their token sequences. + + Args: + sentences (list[str]): Input sentences. + + Returns: + list[list[int]]: Token ID sequences. + """ + if self.is_fast: + return self.tokenizer.batch_encode_plus( + sentences, + add_special_tokens=False, + truncation=True, + return_attention_mask=False, + )["input_ids"] + else: + return [self.tokenize(sentence) for sentence in sentences] + + def collate(self, samples: list[list[int]]) -> BatchEncoding: """Make a mini-batch from samples. Args: - samples (List[List[int]]): Token sequences. + samples (list[list[int]]): Token sequences. Returns: BatchEncoding: A mini-batch. diff --git a/semsis/encoder/tokenizer_test.py b/src/semsis/encoder/tokenizer_test.py similarity index 100% rename from semsis/encoder/tokenizer_test.py rename to src/semsis/encoder/tokenizer_test.py diff --git a/semsis/kvstore.py b/src/semsis/kvstore.py similarity index 80% rename from semsis/kvstore.py rename to src/semsis/kvstore.py index 8475d10..cd215b0 100644 --- a/semsis/kvstore.py +++ b/src/semsis/kvstore.py @@ -1,11 +1,14 @@ +from __future__ import annotations + import contextlib -from os import PathLike -from typing import Any, Generator, Optional +from typing import Generator, Optional import h5py import numpy as np from numpy.typing import DTypeLike +from semsis.typing import NDArrayFloat, NDArrayI64, StrPath + class KVStore: """Key--value store class. @@ -41,12 +44,12 @@ def dtype(self) -> DTypeLike: """Return the dtype of the key array.""" return self.key.dtype - def add(self, keys: np.ndarray, values: Optional[np.ndarray] = None) -> None: + def add(self, keys: NDArrayFloat, values: Optional[NDArrayI64] = None) -> None: """Add the given key vectors into the key array. Args: - keys (np.ndarray): The key vectors of shape `(n, dim)`. - values (np.ndarray, optional): The value IDs of shape `(n, dim)`. + keys (NDArrayFloat): The key vectors of shape `(n, dim)`. + values (NDArrayI64, optional): The value IDs of shape `(n, dim)`. If values are not given, value IDs will be assigned incrementally. """ ntotal = len(self) @@ -63,18 +66,18 @@ def new(self, dim: int, dtype: DTypeLike = np.float32) -> None: Args: dim (int): The dimension size. - dtype (DtypeLike): Dtype of the key array. + dtype (DTypeLike): Dtype of the key array. """ self.f.create_dataset("key", shape=(0, dim), dtype=dtype, maxshape=(None, dim)) self.f.create_dataset("value", shape=(0,), dtype=np.int64, maxshape=(None,)) @classmethod @contextlib.contextmanager - def open(cls, path: PathLike, mode: str = "r") -> Generator["KVStore", Any, None]: + def open(cls, path: StrPath, mode: str = "r") -> Generator[KVStore, None, None]: """Open a binary file of this kvstore. Args: - path (os.PathLike): A path to the file. + path (StrPath): A path to the file. mode (str): Mode of this file objects. See https://docs.h5py.org/en/stable/high/file.html diff --git a/semsis/kvstore_test.py b/src/semsis/kvstore_test.py similarity index 100% rename from semsis/kvstore_test.py rename to src/semsis/kvstore_test.py diff --git a/semsis/registry.py b/src/semsis/registry.py similarity index 86% rename from semsis/registry.py rename to src/semsis/registry.py index 997264b..768b97f 100644 --- a/semsis/registry.py +++ b/src/semsis/registry.py @@ -1,4 +1,4 @@ -from typing import Any, Callable, Dict, Type, TypeVar +from typing import Any, Callable, Type, TypeVar T = TypeVar("T") @@ -41,13 +41,13 @@ def get_cls(name: str): return register, get_cls -def get_registry(registry_name: str) -> Dict[str, Type[Any]]: +def get_registry(registry_name: str) -> dict[str, Type[Any]]: """Get registry of the given name. Args: registry_name (str): Registry name to be returned. Returns: - Dict[str, Type[Any]]: Class mapper from registered name to its corresponding class. + dict[str, Type[Any]]: Class mapper from registered name to its corresponding class. """ return REGISTRIES[registry_name] diff --git a/semsis/registry_test.py b/src/semsis/registry_test.py similarity index 100% rename from semsis/registry_test.py rename to src/semsis/registry_test.py diff --git a/semsis/retriever/__init__.py b/src/semsis/retriever/__init__.py similarity index 66% rename from semsis/retriever/__init__.py rename to src/semsis/retriever/__init__.py index adc6005..2a6c220 100644 --- a/semsis/retriever/__init__.py +++ b/src/semsis/retriever/__init__.py @@ -1,25 +1,23 @@ -from os import PathLike -from typing import Type - import yaml from semsis import registry +from semsis.typing import StrPath register, get_cls = registry.setup("retriever") -from .base import Retriever +from .base import Retriever, Metric from .faiss_cpu import RetrieverFaissCPU from .faiss_gpu import RetrieverFaissGPU -def load_backend_from_config(cfg_path: PathLike) -> Type[Retriever]: +def load_backend_from_config(cfg_path: StrPath) -> type[Retriever]: """Load the backend retriever type from the configuration file. Args: - cfg_path (os.PathLike): Path to the configuration file. + cfg_path (StrPath): Path to the configuration file. Returns: - Type[Retriever]: The backend retriever type. + type[Retriever]: The backend retriever type. """ with open(cfg_path, mode="r") as f: cfg = yaml.safe_load(f) @@ -27,6 +25,7 @@ def load_backend_from_config(cfg_path: PathLike) -> Type[Retriever]: __all__ = [ + "Metric", "Retriever", "RetrieverFaissCPU", "RetrieverFaissGPU", diff --git a/semsis/retriever/base.py b/src/semsis/retriever/base.py similarity index 50% rename from semsis/retriever/base.py rename to src/semsis/retriever/base.py index 4faf348..ab6ec20 100644 --- a/semsis/retriever/base.py +++ b/src/semsis/retriever/base.py @@ -1,16 +1,58 @@ +from __future__ import annotations + import abc +import enum from dataclasses import asdict, dataclass -from os import PathLike -from typing import Any, Optional, Tuple, Type, TypeVar +from typing import Any, Optional, TypeVar -import numpy as np import yaml from semsis import retriever +from semsis.typing import NDArrayF32, NDArrayFloat, NDArrayI64, StrPath T = TypeVar("T") +class Metric(str, enum.Enum): + l2 = "l2" + ip = "ip" + cos = "cos" + + +@dataclass +class RetrieverParam: + """Configuration of the retriever. + + - hnsw_nlinks (int): [HNSW] Number of links for each node. + If this value is greater than 0, HNSW will be used. + - ivf_nlists (int): [IVF] Number of centroids. + - pq_nblocks (int): [PQ] Number of sub-vectors to be splitted. + - pq_nbits (int): [PQ] Size of codebooks for each sub-space. + Usually 8 bit is employed; thus, each codebook has 256 codes. + - opq (bool): [OPQ] Use OPQ pre-transformation which minimizes the quantization error. + - pca (bool): [PCA] Use PCA dimension reduction. + - pca_dim (int): [PCA] Dimension size which is reduced by PCA. + - fp16 (bool): Use FP16. (GPU only) + """ + + hnsw_nlinks: int = 0 + ivf_nlists: int = 0 + pq_nblocks: int = 0 + pq_nbits: int = 8 + opq: bool = False + pca: bool = False + pca_dim: int = 0 + fp16: bool = False + + def __post_init__(self): + self.hnsw = self.hnsw_nlinks > 0 + self.ivf = self.ivf_nlists > 0 + self.pq = self.pq_nblocks > 0 + if self.opq and self.pca: + raise ValueError("`opq` and `pca` cannot be set True at the same time.") + self.transform = self.opq or self.pca + + class Retriever(abc.ABC): """Base class of retriever classes. @@ -19,7 +61,7 @@ class Retriever(abc.ABC): cfg (Retriever.Config): Configuration dataclass. """ - def __init__(self, index: Any, cfg: "Config") -> None: + def __init__(self, index: Any, cfg: Config) -> None: self.index = index self.cfg = cfg @@ -29,28 +71,35 @@ class Config: - dim (int): Size of the dimension. - backend (str): Backend of the search engine. - - metric (str): Distance function. + - metric (Metric): Distance function. """ - dim: int + dim: int = -1 backend: str = "faiss-cpu" - metric: str = "l2" + metric: Metric = Metric.l2 - def save(self, path: PathLike) -> None: + def save(self, path: StrPath) -> None: """Save the configuration. Args: - path (os.PathLike): File path. + path (StrPath): File path. """ with open(path, mode="w") as f: - yaml.dump(asdict(self), f, indent=True) + yaml.dump( + { + k: v.value if isinstance(v, Metric) else v + for k, v in asdict(self).items() + }, + f, + indent=True, + ) @classmethod - def load(cls, path: PathLike): + def load(cls, path: StrPath): """Load the configuration. Args: - path (os.PathLike): File path. + path (StrPath): File path. Returns: Retriver.Config: This configuration object. @@ -65,7 +114,7 @@ def __len__(self) -> int: @classmethod @abc.abstractmethod - def build(cls: Type[T], cfg: "Config") -> T: + def build(cls: type[T], cfg: Config) -> T: """Build this class from the given configuration. @@ -104,54 +153,54 @@ def set_efsearch(self, efsearch: int) -> None: """ @abc.abstractmethod - def normalize(self, vectors: np.ndarray) -> np.ndarray: + def normalize(self, vectors: NDArrayFloat) -> NDArrayF32: """Normalize the input vectors for a backend library and the specified metric. Args: - vectors (np.ndarray): Input vectors. + vectors (NDArrayFloat): Input vectors. Returns: - np.ndarray: Normalized vectors. + NDArrayF32: Normalized vectors. """ @abc.abstractmethod - def train(self, vectors: np.ndarray) -> None: + def train(self, vectors: NDArrayFloat) -> None: """Train the index for some approximate nearest neighbor search algorithms. Args: - vectors (np.ndarray): Training vectors. + vectors (NDArrayFloat): Training vectors. """ @abc.abstractmethod - def add(self, vectors: np.ndarray, ids: Optional[np.ndarray] = None) -> None: + def add(self, vectors: NDArrayFloat, ids: Optional[NDArrayI64] = None) -> None: """Add key vectors to the index. Args: - vectors (np.ndarray): Key vectors to be added. - ids (np.ndarray, optional): Value indices. + vectors (NDArrayFloat): Key vectors to be added. + ids (NDArrayI64, optional): Value indices. """ @abc.abstractmethod - def search(self, querys: np.ndarray, k: int = 1) -> Tuple[np.ndarray, np.ndarray]: + def search(self, querys: NDArrayFloat, k: int = 1) -> tuple[NDArrayF32, NDArrayI64]: """Search the k nearest neighbor vectors of the querys. Args: - querys (np.ndarray): Query vectors. + querys (NDArrayFloat): Query vectors. k (int): Top-k. Returns: - distances (np.ndarray): Distances between the querys and the k nearest + distances (NDArrayF32): Distances between the querys and the k nearest neighbor vectors. - indices (np.ndarray): Indices of the k nearest neighbor vectors. + indices (NDArrayI64): Indices of the k nearest neighbor vectors. """ @classmethod - def load(cls: Type[T], index_path: PathLike, cfg_path: PathLike) -> T: + def load(cls: type[T], index_path: StrPath, cfg_path: StrPath) -> T: """Load the index and its configuration. Args: - index_path (os.PathLike): Index file path. - cfg_path (os.PathLike): Configuration file path. + index_path (StrPath): Index file path. + cfg_path (StrPath): Configuration file path. Returns: Retriever: This class. @@ -160,44 +209,44 @@ def load(cls: Type[T], index_path: PathLike, cfg_path: PathLike) -> T: index = cls.load_index(index_path) return cls(index, cfg) - def save(self, index_path: PathLike, cfg_path: PathLike) -> None: + def save(self, index_path: StrPath, cfg_path: StrPath) -> None: """Save the index and its configuration. Args: - index_path (os.PathLike): Index file path to save. - cfg_path (os.PathLike): Configuration file path to save. + index_path (StrPath): Index file path to save. + cfg_path (StrPath): Configuration file path to save. """ self.cfg.save(cfg_path) self.save_index(index_path) @classmethod @abc.abstractmethod - def load_index(cls, path: PathLike) -> Any: + def load_index(cls, path: StrPath) -> Any: """Load the index. Args: - path (os.PathLike): Index file path. + path (StrPath): Index file path. Returns: Any: Index object. """ @abc.abstractmethod - def save_index(self, path: PathLike) -> None: + def save_index(self, path: StrPath) -> None: """Save the index. Args: - path (os.PathLike): Index file path to save. + path (StrPath): Index file path to save. """ @classmethod - def get_cls(cls, name: str) -> Type["Retriever"]: + def get_cls(cls, name: str) -> type[Retriever]: """Return the retriever class from the given name. Args: name (str): Registered name. Returns: - Type[Retriever]: Retriever class. + type[Retriever]: Retriever class. """ return retriever.get_cls(name) diff --git a/semsis/retriever/base_test.py b/src/semsis/retriever/base_test.py similarity index 100% rename from semsis/retriever/base_test.py rename to src/semsis/retriever/base_test.py diff --git a/semsis/retriever/faiss_cpu.py b/src/semsis/retriever/faiss_cpu.py similarity index 71% rename from semsis/retriever/faiss_cpu.py rename to src/semsis/retriever/faiss_cpu.py index 69b8457..f7de568 100644 --- a/semsis/retriever/faiss_cpu.py +++ b/src/semsis/retriever/faiss_cpu.py @@ -1,24 +1,27 @@ +from __future__ import annotations + from dataclasses import dataclass -from os import PathLike -from typing import Any, Dict, Optional, Tuple +from typing import Optional import faiss import numpy as np from semsis.retriever import Retriever, register +from semsis.retriever.base import Metric, RetrieverParam +from semsis.typing import NDArrayF32, NDArrayFloat, NDArrayI64, StrPath -MetricType = int +FaissMetricType = int def faiss_index_builder( - cfg: "RetrieverFaissCPU.Config", dim: int, metric: MetricType + cfg: RetrieverFaissCPU.Config, dim: int, metric: FaissMetricType ) -> faiss.Index: """Build a faiss index from the given configuration. Args: cfg (RetrieverFaissCPU.Config): dim (int): - metric: MetricType + metric: FaissMetricType Returns: faiss.Index: Faiss index. @@ -55,22 +58,19 @@ class RetrieverFaissCPU(Retriever): cfg (RetrieverFaissCPU.Config): Configuration dataclass. """ - index: faiss.Index - cfg: "Config" - - METRICS_MAP: Dict[str, MetricType] = { - "l2": faiss.METRIC_L2, - "ip": faiss.METRIC_INNER_PRODUCT, - "cos": faiss.METRIC_INNER_PRODUCT, + METRICS_MAP: dict[Metric, FaissMetricType] = { + Metric.l2: faiss.METRIC_L2, + Metric.ip: faiss.METRIC_INNER_PRODUCT, + Metric.cos: faiss.METRIC_INNER_PRODUCT, } @dataclass - class Config(Retriever.Config): + class Config(RetrieverParam, Retriever.Config): """Configuration of the retriever. - dim (int): Size of the dimension. - backend (str): Backend of the search engine. - - metric (str): Distance function. + - metric (Metric): Distance function. - hnsw_nlinks (int): [HNSW] Number of links for each node. If this value is greater than 0, HNSW will be used. - ivf_nlists (int): [IVF] Number of centroids. @@ -80,37 +80,27 @@ class Config(Retriever.Config): - opq (bool): [OPQ] Use OPQ pre-transformation which minimizes the quantization error. - pca (bool): [PCA] Use PCA dimension reduction. - pca_dim (int): [PCA] Dimension size which is reduced by PCA. + - fp16 (bool): Use FP16. (GPU only) """ - hnsw_nlinks: int = 0 - ivf_nlists: int = 0 - pq_nblocks: int = 0 - pq_nbits: int = 8 - opq: bool = False - pca: bool = False - pca_dim: int = 0 - def __post_init__(self): - self.hnsw = self.hnsw_nlinks > 0 - self.ivf = self.ivf_nlists > 0 - self.pq = self.pq_nblocks > 0 - if self.opq and self.pca: - raise ValueError("`opq` and `pca` cannot be set True at the same time.") - self.transform = self.opq or self.pca - + super().__post_init__() if self.pca and self.pca_dim <= 0: self.pca_dim = self.dim + index: faiss.Index + cfg: Config + def __len__(self) -> int: """Return the size of the index.""" return self.index.ntotal @classmethod - def build(cls, cfg: "Config") -> "RetrieverFaissCPU": + def build(cls, cfg: Config) -> RetrieverFaissCPU: """Build this class from the given configuration. Args: - cfg (Retriever.Config): Configuration. + cfg (RetrieverFaissCPU.Config): Configuration. Returns: RetrieverFaissCPU: This class with the constucted index object. @@ -149,73 +139,73 @@ def set_efsearch(self, efsearch: int) -> None: if self.cfg.hnsw: faiss.ParameterSpace().set_index_parameter(self.index, "efSearch", efsearch) - def normalize(self, vectors: np.ndarray) -> np.ndarray: + def normalize(self, vectors: NDArrayFloat) -> NDArrayF32: """Normalize the input vectors for a backend library and the specified metric. Args: - vectors (np.ndarray): Input vectors. + vectors (NDArrayFloat): Input vectors. Returns: - np.ndarray: Normalized vectors. + NDArrayF32: Normalized vectors. """ if not np.issubdtype(vectors.dtype, np.float32): vectors = np.array(vectors, dtype=np.float32) - if self.cfg.metric == "cos": + if self.cfg.metric == Metric.cos: vectors /= np.fmax(np.linalg.norm(vectors, axis=-1, keepdims=True), 1e-9) return vectors - def train(self, vectors: np.ndarray) -> None: + def train(self, vectors: NDArrayFloat) -> None: """Train the index for some approximate nearest neighbor search algorithms. Args: - vectors (np.ndarray): Training vectors. + vectors (NDArrayFloat): Training vectors. """ vectors = self.normalize(vectors) self.index.train(vectors) - def add(self, vectors: np.ndarray, ids: Optional[np.ndarray] = None) -> None: + def add(self, vectors: NDArrayFloat, ids: Optional[NDArrayI64] = None) -> None: """Add key vectors to the index. Args: - vectors (np.ndarray): Key vectors to be added. - ids (np.ndarray, optional): Value indices. + vectors (NDArrayFloat): Key vectors to be added. + ids (NDArrayI64, optional): Value indices. """ vectors = self.normalize(vectors) if ids is None: ids = np.arange(len(self), len(self) + len(vectors)) return self.index.add_with_ids(vectors, ids) - def search(self, querys: np.ndarray, k: int = 1) -> Tuple[np.ndarray, np.ndarray]: + def search(self, querys: NDArrayFloat, k: int = 1) -> tuple[NDArrayF32, NDArrayI64]: """Search the k nearest neighbor vectors of the querys. Args: - querys (np.ndarray): Query vectors. + querys (NDArrayFloat): Query vectors. k (int): Top-k. Returns: - distances (np.ndarray): Distances between the querys and the k nearest + distances (NDArrayF32): Distances between the querys and the k nearest neighbor vectors. - indices (np.ndarray): Indices of the k nearest neighbor vectors. + indices (NDArrayI64): Indices of the k nearest neighbor vectors. """ querys = self.normalize(querys) return self.index.search(querys, k=k) @classmethod - def load_index(cls, path: PathLike) -> Any: + def load_index(cls, path: StrPath) -> faiss.Index: """Load the index. Args: - path (os.PathLike): Index file path. + path (StrPath): Index file path. Returns: faiss.Index: Index object. """ return faiss.read_index(str(path)) - def save_index(self, path: PathLike) -> None: - """Saves the index. + def save_index(self, path: StrPath) -> None: + """Save the index. Args: - path (os.PathLike): Index file path to save. + path (StrPath): Index file path to save. """ return faiss.write_index(self.index, str(path)) diff --git a/semsis/retriever/faiss_cpu_test.py b/src/semsis/retriever/faiss_cpu_test.py similarity index 100% rename from semsis/retriever/faiss_cpu_test.py rename to src/semsis/retriever/faiss_cpu_test.py diff --git a/semsis/retriever/faiss_gpu.py b/src/semsis/retriever/faiss_gpu.py similarity index 86% rename from semsis/retriever/faiss_gpu.py rename to src/semsis/retriever/faiss_gpu.py index 01fa4ca..9d0b908 100644 --- a/semsis/retriever/faiss_gpu.py +++ b/src/semsis/retriever/faiss_gpu.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging from dataclasses import dataclass from typing import Optional, Union @@ -8,6 +10,7 @@ from semsis.retriever import register from semsis.retriever.faiss_cpu import RetrieverFaissCPU +from semsis.typing import NDArrayF32, NDArrayFloat, NDArrayI64 logger = logging.getLogger(__name__) @@ -65,35 +68,14 @@ class RetrieverFaissGPU(RetrieverFaissCPU): cfg (RetrieverFaissGPU.Config): Configuration dataclass. """ - cfg: "RetrieverFaissGPU.Config" + cfg: RetrieverFaissGPU.Config - def __init__(self, index: faiss.Index, cfg: "RetrieverFaissGPU.Config") -> None: + def __init__(self, index: faiss.Index, cfg: RetrieverFaissGPU.Config) -> None: super().__init__(index, cfg) self.A: Optional[torch.Tensor] = None self.b: Optional[torch.Tensor] = None self.gpu_ivf_index: Optional[ShardedGpuIndex] = None - @dataclass - class Config(RetrieverFaissCPU.Config): - """Configuration of the retriever. - - - dim (int): Size of the dimension. - - backend (str): Backend of the search engine. - - metric (str): Distance function. - - hnsw_nlinks (int): [HNSW] Number of links for each node. - If this value is greater than 0, HNSW will be used. - - ivf_nlists (int): [IVF] Number of centroids. - - pq_nblocks (int): [PQ] Number of sub-vectors to be splitted. - - pq_nbits (int): [PQ] Size of codebooks for each sub-space. - Usually 8 bit is employed; thus, each codebook has 256 codes. - - opq (bool): [OPQ] Use OPQ pre-transformation which minimizes the quantization error. - - pca (bool): [PCA] Use PCA dimension reduction. - - pca_dim (int): [PCA] Dimension size which is reduced by PCA. - - fp16 (bool): Use FP16. - """ - - fp16: bool = False - def to_gpu_train(self) -> None: """Transfers the faiss index to GPUs for training. @@ -196,7 +178,7 @@ def to_gpu_search(self) -> None: faiss.downcast_index(ivf_index.quantizer).storage ) self.index = faiss_index_to_gpu(self.index, fp16=self.cfg.fp16) - logger.info(f"The retriever index is on the GPU.") + logger.info("The retriever index is on the GPU.") def to_cpu(self) -> None: """Transfers the faiss index to CPUs.""" @@ -237,7 +219,7 @@ def rotate(self, x: torch.Tensor, shard_size: int = 2**20) -> torch.Tensor: i = j return torch.cat(results, dim=0) - def add_gpu_ivf_index(self, vectors: np.ndarray, ids: np.ndarray) -> None: + def add_gpu_ivf_index(self, vectors: NDArrayF32, ids: NDArrayI64) -> None: """Adds vectors to the index with the full GPU IVF index. This method runs as follows: @@ -249,8 +231,8 @@ def add_gpu_ivf_index(self, vectors: np.ndarray, ids: np.ndarray) -> None: 5. Empties the storage of the GPU index. Here, the GPU index has only centroids. Args: - vectors (ndarray): Key vectors to be added. - ids (np.ndarray): Value indices. + vectors (NDArrayF32): Key vectors to be added. + ids (NDArrayI64): Value indices. """ ivf: faiss.IndexIVF = faiss.extract_index_ivf(self.index) self.gpu_ivf_index.add_with_ids(vectors, ids) @@ -259,12 +241,12 @@ def add_gpu_ivf_index(self, vectors: np.ndarray, ids: np.ndarray) -> None: ivf.merge_from(cpu_ivf_index, 0) self.gpu_ivf_index.reset() - def add(self, vectors: np.ndarray, ids: Optional[np.ndarray] = None) -> None: + def add(self, vectors: NDArrayFloat, ids: Optional[NDArrayI64] = None) -> None: """Add key vectors to the index. Args: - vectors (np.ndarray): Key vectors to be added. - ids (np.ndarray, optional): Value indices. + vectors (NDArrayFloat): Key vectors to be added. + ids (NDArrayI64, optional): Value indices. """ vectors = self.normalize(vectors) if self.cfg.transform: diff --git a/semsis/retriever/faiss_gpu_test.py b/src/semsis/retriever/faiss_gpu_test.py similarity index 100% rename from semsis/retriever/faiss_gpu_test.py rename to src/semsis/retriever/faiss_gpu_test.py diff --git a/src/semsis/typing.py b/src/semsis/typing.py new file mode 100644 index 0000000..7636c88 --- /dev/null +++ b/src/semsis/typing.py @@ -0,0 +1,11 @@ +import os +from typing import Union + +import numpy as np +from numpy.typing import NDArray + +StrPath = Union[str, os.PathLike[str]] +NDArrayI64 = NDArray[np.int64] +NDArrayF32 = NDArray[np.float32] +NDArrayF16 = NDArray[np.float16] +NDArrayFloat = Union[NDArrayF32, NDArrayF16] diff --git a/semsis/utils.py b/src/semsis/utils.py similarity index 100% rename from semsis/utils.py rename to src/semsis/utils.py diff --git a/semsis/utils_test.py b/src/semsis/utils_test.py similarity index 100% rename from semsis/utils_test.py rename to src/semsis/utils_test.py diff --git a/tests/end2end_test.py b/tests/end2end_test.py index a3570b6..58a4157 100644 --- a/tests/end2end_test.py +++ b/tests/end2end_test.py @@ -1,5 +1,8 @@ #!/usr/bin/env python3 +import json import math +import subprocess +import sys from pathlib import Path import numpy as np @@ -30,7 +33,7 @@ @pytest.mark.parametrize( "model", ["bert-base-uncased", "sentence-transformers/all-MiniLM-L6-v2"] ) -def test_end2end(tmp_path: Path, model, representation): +def test_end2end_py(tmp_path: Path, model, representation): # 1. Encode the sentences and store in a key--value store. encoder = SentenceEncoder.build(model, representation) dim = encoder.get_embed_dim() @@ -65,5 +68,165 @@ def test_end2end(tmp_path: Path, model, representation): assert np.isclose(distances[2, 0], 0.0) +@pytest.mark.parametrize("representation", ["cls", "avg", "sbert"]) +@pytest.mark.parametrize( + "model", ["bert-base-uncased", "sentence-transformers/all-MiniLM-L6-v2"] +) +def test_end2end_cli(tmp_path: Path, model: str, representation: str): + # 1. Encode the sentences and store in a key--value store. + with open(tmp_path / "text.txt", mode="w") as f: + for text in TEXT: + print(text, file=f) + + encode_cmds: list[str] = [ + sys.executable, + "-m", + "semsis.cli.store_kv", + "--input", + str(tmp_path / "text.txt"), + "--output", + str(tmp_path / "kv.bin"), + "--model", + model, + "--representation", + representation, + ] + subprocess.run(encode_cmds) + + # 2. Read the KVStore and build the kNN index. + index_path = str(tmp_path / "index.bin") + cfg_path = str(tmp_path / "cfg.yaml") + index_cmds: list[str] = [ + sys.executable, + "-m", + "semsis.cli.build_retriever", + "--kvstore", + str(tmp_path / "kv.bin"), + "--index_path", + index_path, + "--config_path", + cfg_path, + ] + subprocess.run(index_cmds) + + # 3. Query. + query_path = str(tmp_path / "query.txt") + output_path = str(tmp_path / "output.json") + search_cmds: list[str] = [ + sys.executable, + "-m", + "semsis.cli.query_interactive", + "--index_path", + index_path, + "--config_path", + cfg_path, + "--input", + query_path, + "--output", + output_path, + "--format", + "json", + "--model", + model, + "--representation", + representation, + ] + + with open(query_path, mode="w") as f: + for query in QUERYS: + print(query, file=f) + + subprocess.run(search_cmds) + + with open(output_path, mode="r") as f: + for i, line in enumerate(f): + res = json.loads(line) + assert res["results"][0]["idx"] == [2, 0, 2][i] + if i == 2: + assert np.isclose(res["results"][0]["distance"], 0.0) + + +def test_end2end_cli_e5_small_v2(tmp_path: Path): + model = "intfloat/e5-small-v2" + representation = "sbert" + + # 1. Encode the sentences and store in a key--value store. + with open(tmp_path / "text.txt", mode="w") as f: + for text in TEXT: + print(text, file=f) + + encode_cmds: list[str] = [ + sys.executable, + "-m", + "semsis.cli.store_kv", + "--input", + str(tmp_path / "text.txt"), + "--output", + str(tmp_path / "kv.bin"), + "--model", + model, + "--representation", + representation, + "--prefix_string", + "passage: ", + ] + subprocess.run(encode_cmds) + + # 2. Read the KVStore and build the kNN index. + index_path = str(tmp_path / "index.bin") + cfg_path = str(tmp_path / "cfg.yaml") + index_cmds: list[str] = [ + sys.executable, + "-m", + "semsis.cli.build_retriever", + "--kvstore", + str(tmp_path / "kv.bin"), + "--index_path", + index_path, + "--config_path", + cfg_path, + ] + subprocess.run(index_cmds) + + # 3. Query. + query_path = str(tmp_path / "query.txt") + output_path = str(tmp_path / "output.json") + search_cmds: list[str] = [ + sys.executable, + "-m", + "semsis.cli.query_interactive", + "--index_path", + index_path, + "--config_path", + cfg_path, + "--input", + query_path, + "--output", + output_path, + "--format", + "json", + "--model", + model, + "--representation", + representation, + "--prefix_string", + "query: ", + ] + + with open(query_path, mode="w") as f: + for query in QUERYS: + print(query, file=f) + + subprocess.run(search_cmds) + + with open(output_path, mode="r") as f: + for i, line in enumerate(f): + res = json.loads(line) + assert res["results"][0]["idx"] == [2, 0, 2][i] + if i == 2: + # `not isclose`: Because prefix_string is asymmetry betweeen the text and query. + assert not np.isclose(res["results"][0]["distance"], 0.0) + + if __name__ == "__main__": pytest.main()