diff --git a/from_mwt_ds/DataScience/vw_executor/version.py b/from_mwt_ds/DataScience/vw_executor/version.py new file mode 100644 index 0000000..72c14cd --- /dev/null +++ b/from_mwt_ds/DataScience/vw_executor/version.py @@ -0,0 +1,22 @@ +import re + +def _extract_commit(s): + result = re.search('.*\(git commit:\s+(\S+)\)', s) + if result: + return result.group(1) + return None + + +class Version: + def __init__(self, kind: str, version: str): + self.kind = kind + parts = [int(i) for i in version.split()[0].split('.')] + assert len(parts) == 3 + self.major = parts[0] + self.minor = parts[1] + self.rev = parts[2] + self.commit = _extract_commit(version) + self.pattern = str(self) + + def __str__(self): + return f'{self.kind}-{self.major}.{self.minor}.{self.rev}-{self.commit or ""}' \ No newline at end of file diff --git a/from_mwt_ds/DataScience/vw_executor/vw.py b/from_mwt_ds/DataScience/vw_executor/vw.py index 123f179..e487dc9 100644 --- a/from_mwt_ds/DataScience/vw_executor/vw.py +++ b/from_mwt_ds/DataScience/vw_executor/vw.py @@ -9,12 +9,13 @@ from vw_executor.artifacts import Output, Predictions, Model8, Model9, Model from vw_executor.pool import SeqPool, MultiThreadPool, Pool from vw_executor.loggers import MultiLogger, ILogger -from vw_executor.vw_cache import VwCache +from vw_executor.vw_cache import VwCache, create_cache from vw_executor.handlers import MultiHandler, HandlerBase, ProgressBars from vw_executor.vw_opts import VwOpts, InteractiveGrid, VwOptsLike, GridLike +from vw_executor.version import Version from typing import Iterable, Optional, Union, Dict, Any, Type, List -from abc import ABC, abstractmethod +from abc import ABC, abstractmethod, abstractproperty def _save(txt: Union[str, Iterable[str]], path: Path) -> None: @@ -42,12 +43,17 @@ def __init__(self, path: Optional[Path]): def run(self, args: str) -> Union[str, List[str]]: ... + @abstractproperty + def version(self) -> Version: + ... + class _VwBin(_VwCore): def __init__(self, path: Path): super().__init__(path) + self._version = Version(kind='vw', version=self.run("--version", stdout=True).strip()) - def run(self, args: str) -> str: + def run(self, args: str, stdout: bool = False) -> str: command = f'{self.path} {args}' process = subprocess.Popen( command.split(), @@ -56,9 +62,13 @@ def run(self, args: str) -> str: stdout=subprocess.PIPE, stderr=subprocess.PIPE ) - error = process.communicate()[1] + error = process.communicate()[0 if stdout else 1] return error + @property + def version(self) -> Version: + return self._version + def _run_pyvw(args: str) -> Iterable[str]: from vowpalwabbit import pyvw @@ -69,13 +79,19 @@ def _run_pyvw(args: str) -> Iterable[str]: class _VwPy(_VwCore): def __init__(self): + import vowpalwabbit super().__init__(None) + self._version = Version(kind='pyvw', version=vowpalwabbit.__version__) def run(self, args: str) -> Iterable[str]: from multiprocessing import Pool with Pool(1) as p: return p.apply(_run_pyvw, [args]) + @property + def version(self) -> Version: + return self._version + class Task: job: 'Job' @@ -123,10 +139,14 @@ def _prepare_args(self, cache: VwCache) -> str: if self.model_file: opts['-i'] = self.model_file - self.outputs_relative = {o: cache.get_path(opts, self._logger, o, salt) for o in self.job.outputs.keys()} - self.outputs = {o: cache.path.joinpath(p) for o, p in self.outputs_relative.items()} + self.outputs_relative = { + o: cache.get_path_for_hash(opts, self._logger, o, salt) + for o in self.job.outputs.keys() + } + self.outputs = {o: cache.get_path(p, self.job.core.version) for o, p in self.outputs_relative.items()} - self.stdout = Output(cache.path.joinpath(cache.get_path(opts, self._logger, None, salt))) + self.stdout = Output(cache.get_path( + cache.get_path_for_hash(opts, self._logger, None, salt), self.job.core.version)) if self.model_file: opts['-i'] = self.model_folder.joinpath(self.model_file) @@ -332,8 +352,10 @@ def __init__(self, no_run: bool = False, reset: bool = False, handler: Optional[HandlerBase] = ProgressBars(), - logger: Optional[ILogger] = None): - self._cache = VwCache(_assert_path_is_supported(cache_path)) + logger: Optional[ILogger] = None, + cache_version: int = 1): + self._cache_version = cache_version + self._cache = create_cache(_assert_path_is_supported(cache_path), version=cache_version) self._vw = _VwBin(path) if path is not None else _VwPy() self.logger = logger or MultiLogger([]) self.pool = SeqPool() if procs == 1 else MultiThreadPool(procs) @@ -349,14 +371,20 @@ def _with(self, no_run: Optional[bool] = None, reset: Optional[bool] = None, handler: Optional[HandlerBase] = None, - logger: Optional[ILogger] = None) -> 'Vw': + logger: Optional[ILogger] = None, + cache_version: Optional[int] = None) -> 'Vw': return Vw(cache_path or self._cache.path, path or self._vw.path, procs or self.pool.procs, no_run if no_run is not None else self.no_run, reset if reset is not None else self.reset, handler or self.handler, - logger or self.logger) + logger or self.logger, + cache_version or self._cache_version) + + @property + def version(self): + return self._vw.version def _run_impl(self, inputs: List[Path], diff --git a/from_mwt_ds/DataScience/vw_executor/vw_cache.py b/from_mwt_ds/DataScience/vw_executor/vw_cache.py index c4cc464..c596511 100644 --- a/from_mwt_ds/DataScience/vw_executor/vw_cache.py +++ b/from_mwt_ds/DataScience/vw_executor/vw_cache.py @@ -2,23 +2,21 @@ from vw_executor.loggers import MultiLogger from vw_executor.vw_opts import VwOptsLike, VwOpts +from vw_executor.version import Version from typing import Optional, Union +from abc import ABC, abstractmethod, abstractproperty -class VwCache: - path: Path - +class VwCache(ABC): def __init__(self, path: Union[str, Path]): self.path = Path(path) self.path.mkdir(parents=True, exist_ok=True) def _get_path(self, context: str, args_hash: str) -> Path: - folder = self.path.joinpath(context) - folder.mkdir(parents=True, exist_ok=True) return Path(context).joinpath(args_hash) - def get_path(self, + def get_path_for_hash(self, opts: VwOptsLike, logger: MultiLogger, output: Optional[str] = None, @@ -27,3 +25,44 @@ def get_path(self, result = self._get_path(f'cache{output}', args_hash) logger.debug(f'Generating path for opts: {str(opts)}, output: {output}. Result: {result}') return result + + @abstractmethod + def get_path(self, + path_for_hash: Path, + version: Version, + logger: MultiLogger): + ... + +class _VwCache1(VwCache): + path: Path + + def __init__(self, path: Union[str, Path]): + super().__init__(path) + + def get_path(self, path_for_hash: Path, _: Version): + full_path = self.path.joinpath(path_for_hash) + full_path.parent.mkdir(parents=True, exist_ok=True) + return full_path + + + +class _VwCache2(VwCache): + path: Path + + def __init__(self, path: Union[str, Path]): + super().__init__(Path(path).joinpath('v2')) + + def get_path(self, path_for_hash: Path, version: Version): + full_path = self.path.joinpath(path_for_hash) + full_path.mkdir(parents=True, exist_ok=True) + match = next(full_path.glob(version.pattern), None) + return match or full_path.joinpath(str(version)) + + def get_model_path(self, path_for_hash: Path, instance: str): + +def create_cache(path: Union[str, Path], version: int): + if version == 1: + return _VwCache1(path) + elif version == 2: + return _VwCache2(path) + return None