From 0e275d51778e064ce6bd701a366bc42b93d14212 Mon Sep 17 00:00:00 2001 From: Alexey Taymanov Date: Thu, 23 Jun 2022 15:49:39 -0400 Subject: [PATCH 1/8] vw.version --- from_mwt_ds/DataScience/vw_executor/vw.py | 24 ++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/from_mwt_ds/DataScience/vw_executor/vw.py b/from_mwt_ds/DataScience/vw_executor/vw.py index 28231c4..2a0d6bd 100644 --- a/from_mwt_ds/DataScience/vw_executor/vw.py +++ b/from_mwt_ds/DataScience/vw_executor/vw.py @@ -15,7 +15,7 @@ from vw_executor.vw_opts import VwOpts, InteractiveGrid, VwOptsLike, GridLike from typing import Iterable, Optional, Union, Dict, Any, Type, List -from abc import ABC, abstractmethod +from abc import ABC, abstractmethod, abstractproperty def _save(txt: Union[str, Iterable[str]], path: Path) -> None: @@ -43,12 +43,17 @@ def __init__(self, path: Optional[Path]): def run(self, args: str) -> Union[str, List[str]]: ... + @abstractproperty + def version(self): + ... + class _VwBin(_VwCore): def __init__(self, path: Path): super().__init__(path) + self._version = f'vw@{self.run("--version", stdout=True).strip()}' - def run(self, args: str) -> str: + def run(self, args: str, stdout: bool = False) -> str: command = f'{self.path} {args}' process = subprocess.Popen( command.split(), @@ -57,9 +62,13 @@ def run(self, args: str) -> str: stdout=subprocess.PIPE, stderr=subprocess.PIPE ) - error = process.communicate()[1] + error = process.communicate()[0 if stdout else 1] return error + @property + def version(self): + return self._version + def _run_pyvw(args: str) -> Iterable[str]: from vowpalwabbit import pyvw @@ -77,6 +86,11 @@ def run(self, args: str) -> Iterable[str]: with Pool(1) as p: return p.apply(_run_pyvw, [args]) + @property + def version(self): + import vowpalwabbit + return f'pyvw-{vowpalwabbit.__version__}' + class Task: job: 'Job' @@ -359,6 +373,10 @@ def _with(self, handlers if handlers is not None else self.handler.handlers, loggers if loggers is not None else self.logger.loggers) + @property + def version(self): + return self._vw.version + def _run_impl(self, inputs: List[Path], opts: VwOptsLike, From 8d104e1a31d9f5fc648c4ab16ed8527772c26fcf Mon Sep 17 00:00:00 2001 From: Alexey Taymanov Date: Thu, 23 Jun 2022 15:49:50 -0400 Subject: [PATCH 2/8] version bump --- from_mwt_ds/DataScience/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/from_mwt_ds/DataScience/setup.py b/from_mwt_ds/DataScience/setup.py index 3377bab..7d61bbd 100644 --- a/from_mwt_ds/DataScience/setup.py +++ b/from_mwt_ds/DataScience/setup.py @@ -3,7 +3,7 @@ MAJOR = 0 MINOR = 1 -MICRO = 13 +MICRO = 14 VERSION = f"{MAJOR}.{MINOR}.{MICRO}" with open("README.md", "r") as f: From 0202f2989efbb4dff947c73a2336bf02b52e9f37 Mon Sep 17 00:00:00 2001 From: Alexey Taymanov Date: Thu, 23 Jun 2022 15:57:41 -0400 Subject: [PATCH 3/8] more consistent versioning --- from_mwt_ds/DataScience/vw_executor/vw.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/from_mwt_ds/DataScience/vw_executor/vw.py b/from_mwt_ds/DataScience/vw_executor/vw.py index 2a0d6bd..b6b7f9b 100644 --- a/from_mwt_ds/DataScience/vw_executor/vw.py +++ b/from_mwt_ds/DataScience/vw_executor/vw.py @@ -51,7 +51,7 @@ def version(self): class _VwBin(_VwCore): def __init__(self, path: Path): super().__init__(path) - self._version = f'vw@{self.run("--version", stdout=True).strip()}' + self._version = f'vw-{self.run("--version", stdout=True).strip().split()[0]}' def run(self, args: str, stdout: bool = False) -> str: command = f'{self.path} {args}' From a250f061a8cff9b367294992da02f070909f7027 Mon Sep 17 00:00:00 2001 From: Alexey Taymanov Date: Thu, 23 Jun 2022 16:38:43 -0400 Subject: [PATCH 4/8] structured version --- from_mwt_ds/DataScience/vw_executor/vw.py | 35 +++++++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/from_mwt_ds/DataScience/vw_executor/vw.py b/from_mwt_ds/DataScience/vw_executor/vw.py index b6b7f9b..956d436 100644 --- a/from_mwt_ds/DataScience/vw_executor/vw.py +++ b/from_mwt_ds/DataScience/vw_executor/vw.py @@ -33,6 +33,28 @@ class ExecutionStatus(enum.Enum): Failed = 4 +def _extract_commit(s): + import re + result = re.search('.*\(git commit:\s+(\S+)\)', s) + if result: + return result.group(1) + return None + + +class Version: + def __init__(self, kind: str, version: str): + self.kind = kind + parts = [int(i) for i in version.split()[0].split('.')] + assert len(parts) == 3 + self.major = parts[0] + self.minor = parts[1] + self.rev = parts[2] + self.commit = _extract_commit(version) + + def __str__(self): + return f'{self.kind}-{self.major}.{self.minor}.{self.rev}@{self.commit or ""}' + + class _VwCore(ABC): path: Optional[Path] @@ -44,14 +66,14 @@ def run(self, args: str) -> Union[str, List[str]]: ... @abstractproperty - def version(self): + def version(self) -> Version: ... class _VwBin(_VwCore): def __init__(self, path: Path): super().__init__(path) - self._version = f'vw-{self.run("--version", stdout=True).strip().split()[0]}' + self._version = Version(kind='vw', version=self.run("--version", stdout=True).strip()) def run(self, args: str, stdout: bool = False) -> str: command = f'{self.path} {args}' @@ -66,7 +88,7 @@ def run(self, args: str, stdout: bool = False) -> str: return error @property - def version(self): + def version(self) -> Version: return self._version @@ -79,7 +101,9 @@ def _run_pyvw(args: str) -> Iterable[str]: class _VwPy(_VwCore): def __init__(self): + import vowpalwabbit super().__init__(None) + self._version = Version(kind='pyvw', version=vowpalwabbit.__version__) def run(self, args: str) -> Iterable[str]: from multiprocessing import Pool @@ -87,9 +111,8 @@ def run(self, args: str) -> Iterable[str]: return p.apply(_run_pyvw, [args]) @property - def version(self): - import vowpalwabbit - return f'pyvw-{vowpalwabbit.__version__}' + def version(self) -> Version: + return self._version class Task: From e86b67a95b54d1f4bedf3d280e9ac71656f460d4 Mon Sep 17 00:00:00 2001 From: Alexey Taymanov Date: Thu, 23 Jun 2022 19:14:40 -0400 Subject: [PATCH 5/8] vw chache: args hash as folder --- from_mwt_ds/DataScience/vw_executor/vw_cache.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/from_mwt_ds/DataScience/vw_executor/vw_cache.py b/from_mwt_ds/DataScience/vw_executor/vw_cache.py index c4cc464..8567a3b 100644 --- a/from_mwt_ds/DataScience/vw_executor/vw_cache.py +++ b/from_mwt_ds/DataScience/vw_executor/vw_cache.py @@ -14,9 +14,9 @@ def __init__(self, path: Union[str, Path]): self.path.mkdir(parents=True, exist_ok=True) def _get_path(self, context: str, args_hash: str) -> Path: - folder = self.path.joinpath(context) - folder.mkdir(parents=True, exist_ok=True) - return Path(context).joinpath(args_hash) + result = Path(context).joinpath(args_hash) + self.path.joinpath(result).mkdir(parents=True, exist_ok=True) + return result.joinpath('default') def get_path(self, opts: VwOptsLike, From de93382f0e6e31a14abc8cb623a1124d78ae0a65 Mon Sep 17 00:00:00 2001 From: Alexey Taymanov Date: Thu, 23 Jun 2022 19:40:16 -0400 Subject: [PATCH 6/8] cache version --- from_mwt_ds/DataScience/vw_executor/vw.py | 12 ++++++++---- .../DataScience/vw_executor/vw_cache.py | 18 +++++++++++++++--- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/from_mwt_ds/DataScience/vw_executor/vw.py b/from_mwt_ds/DataScience/vw_executor/vw.py index 956d436..fb755ab 100644 --- a/from_mwt_ds/DataScience/vw_executor/vw.py +++ b/from_mwt_ds/DataScience/vw_executor/vw.py @@ -370,8 +370,10 @@ def __init__(self, no_run: bool = False, reset: bool = False, handlers: Union[HandlerBase, List[HandlerBase]] = ProgressBars(), - loggers: Optional[List[ILogger]] = None): - self._cache = VwCache(_assert_path_is_supported(cache_path)) + loggers: Optional[List[ILogger]] = None, + cache_version: int = 1): + self._cache_version = cache_version + self._cache = VwCache(_assert_path_is_supported(cache_path), version=cache_version) self._vw = _VwBin(path) if path is not None else _VwPy() self.logger = MultiLogger(loggers or []) self.pool = SeqPool() if procs == 1 else MultiThreadPool(procs) @@ -387,14 +389,16 @@ def _with(self, no_run: Optional[bool] = None, reset: Optional[bool] = None, handlers: Optional[List[HandlerBase]] = None, - loggers: Optional[List[ILogger]] = None) -> 'Vw': + loggers: Optional[List[ILogger]] = None, + cache_version: Optional[int] = None) -> 'Vw': return Vw(cache_path or self._cache.path, path or self._vw.path, procs or self.pool.procs, no_run if no_run is not None else self.no_run, reset if reset is not None else self.reset, handlers if handlers is not None else self.handler.handlers, - loggers if loggers is not None else self.logger.loggers) + loggers if loggers is not None else self.logger.loggers, + cache_version or self._cache_version) @property def version(self): diff --git a/from_mwt_ds/DataScience/vw_executor/vw_cache.py b/from_mwt_ds/DataScience/vw_executor/vw_cache.py index 8567a3b..fb64385 100644 --- a/from_mwt_ds/DataScience/vw_executor/vw_cache.py +++ b/from_mwt_ds/DataScience/vw_executor/vw_cache.py @@ -9,11 +9,22 @@ class VwCache: path: Path - def __init__(self, path: Union[str, Path]): + def __init__(self, path: Union[str, Path], version): self.path = Path(path) + self.impl = None + if version == 1: + self.impl = self._impl1 + elif version == 2: + self.path = self.path.joinpath('2') + self.impl = self._impl2 self.path.mkdir(parents=True, exist_ok=True) - def _get_path(self, context: str, args_hash: str) -> Path: + def _impl1(self, context: str, args_hash: str) -> Path: + folder = self.path.joinpath(context) + folder.mkdir(parents=True, exist_ok=True) + return Path(context).joinpath(args_hash) + + def _impl2(self, context: str, args_hash: str) -> Path: result = Path(context).joinpath(args_hash) self.path.joinpath(result).mkdir(parents=True, exist_ok=True) return result.joinpath('default') @@ -24,6 +35,7 @@ def get_path(self, output: Optional[str] = None, salt: Optional[int] = None) -> Path: args_hash = VwOpts(dict(opts, **{'-#': salt})).hash() - result = self._get_path(f'cache{output}', args_hash) + result = self.impl(f'cache{output}', args_hash) logger.debug(f'Generating path for opts: {str(opts)}, output: {output}. Result: {result}') return result + From 0bcadc2f554082a03f40bb223a20678fc5e8db67 Mon Sep 17 00:00:00 2001 From: Alexey Taymanov Date: Fri, 24 Jun 2022 10:12:21 -0400 Subject: [PATCH 7/8] update --- .../DataScience/vw_executor/version.py | 22 +++++++ from_mwt_ds/DataScience/vw_executor/vw.py | 35 +++-------- .../DataScience/vw_executor/vw_cache.py | 63 ++++++++++++++----- 3 files changed, 80 insertions(+), 40 deletions(-) create mode 100644 from_mwt_ds/DataScience/vw_executor/version.py diff --git a/from_mwt_ds/DataScience/vw_executor/version.py b/from_mwt_ds/DataScience/vw_executor/version.py new file mode 100644 index 0000000..72c14cd --- /dev/null +++ b/from_mwt_ds/DataScience/vw_executor/version.py @@ -0,0 +1,22 @@ +import re + +def _extract_commit(s): + result = re.search('.*\(git commit:\s+(\S+)\)', s) + if result: + return result.group(1) + return None + + +class Version: + def __init__(self, kind: str, version: str): + self.kind = kind + parts = [int(i) for i in version.split()[0].split('.')] + assert len(parts) == 3 + self.major = parts[0] + self.minor = parts[1] + self.rev = parts[2] + self.commit = _extract_commit(version) + self.pattern = str(self) + + def __str__(self): + return f'{self.kind}-{self.major}.{self.minor}.{self.rev}-{self.commit or ""}' \ No newline at end of file diff --git a/from_mwt_ds/DataScience/vw_executor/vw.py b/from_mwt_ds/DataScience/vw_executor/vw.py index fb755ab..f3ec9fb 100644 --- a/from_mwt_ds/DataScience/vw_executor/vw.py +++ b/from_mwt_ds/DataScience/vw_executor/vw.py @@ -10,9 +10,10 @@ from vw_executor.pool import SeqPool, MultiThreadPool, Pool from vw_executor.loggers import MultiLogger, ILogger from vw_executor.handlers import MultiHandler -from vw_executor.vw_cache import VwCache +from vw_executor.vw_cache import create_cache, VwCache from vw_executor.handlers import HandlerBase, ProgressBars from vw_executor.vw_opts import VwOpts, InteractiveGrid, VwOptsLike, GridLike +from vw_executor.version import Version from typing import Iterable, Optional, Union, Dict, Any, Type, List from abc import ABC, abstractmethod, abstractproperty @@ -33,28 +34,6 @@ class ExecutionStatus(enum.Enum): Failed = 4 -def _extract_commit(s): - import re - result = re.search('.*\(git commit:\s+(\S+)\)', s) - if result: - return result.group(1) - return None - - -class Version: - def __init__(self, kind: str, version: str): - self.kind = kind - parts = [int(i) for i in version.split()[0].split('.')] - assert len(parts) == 3 - self.major = parts[0] - self.minor = parts[1] - self.rev = parts[2] - self.commit = _extract_commit(version) - - def __str__(self): - return f'{self.kind}-{self.major}.{self.minor}.{self.rev}@{self.commit or ""}' - - class _VwCore(ABC): path: Optional[Path] @@ -161,10 +140,14 @@ def _prepare_args(self, cache: VwCache) -> str: if self.model_file: opts['-i'] = self.model_file - self.outputs_relative = {o: cache.get_path(opts, self._logger, o, salt) for o in self.job.outputs.keys()} + self.outputs_relative = { + o: cache.get_path(opts, self._logger, self.job.core.version, o, salt) + for o in self.job.outputs.keys() + } self.outputs = {o: cache.path.joinpath(p) for o, p in self.outputs_relative.items()} - self.stdout = Output(cache.path.joinpath(cache.get_path(opts, self._logger, None, salt))) + self.stdout = Output(cache.path.joinpath( + cache.get_path(opts, self._logger, self.job.core.version, None, salt))) if self.model_file: opts['-i'] = self.model_folder.joinpath(self.model_file) @@ -373,7 +356,7 @@ def __init__(self, loggers: Optional[List[ILogger]] = None, cache_version: int = 1): self._cache_version = cache_version - self._cache = VwCache(_assert_path_is_supported(cache_path), version=cache_version) + self._cache = create_cache(_assert_path_is_supported(cache_path), version=cache_version) self._vw = _VwBin(path) if path is not None else _VwPy() self.logger = MultiLogger(loggers or []) self.pool = SeqPool() if procs == 1 else MultiThreadPool(procs) diff --git a/from_mwt_ds/DataScience/vw_executor/vw_cache.py b/from_mwt_ds/DataScience/vw_executor/vw_cache.py index fb64385..3c895bc 100644 --- a/from_mwt_ds/DataScience/vw_executor/vw_cache.py +++ b/from_mwt_ds/DataScience/vw_executor/vw_cache.py @@ -2,40 +2,75 @@ from vw_executor.loggers import MultiLogger from vw_executor.vw_opts import VwOptsLike, VwOpts +from vw_executor.version import Version from typing import Optional, Union +from abc import ABC, abstractmethod, abstractproperty -class VwCache: +class VwCache(ABC): + @abstractmethod + def get_path(self, + opts: VwOptsLike, + logger: MultiLogger, + version: Version, + output: Optional[str] = None, + salt: Optional[int] = None): + ... + +class _VwCache1(VwCache): path: Path - def __init__(self, path: Union[str, Path], version): + def __init__(self, path: Union[str, Path]): self.path = Path(path) - self.impl = None - if version == 1: - self.impl = self._impl1 - elif version == 2: - self.path = self.path.joinpath('2') - self.impl = self._impl2 self.path.mkdir(parents=True, exist_ok=True) - def _impl1(self, context: str, args_hash: str) -> Path: + def _get_path(self, context: str, args_hash: str) -> Path: folder = self.path.joinpath(context) folder.mkdir(parents=True, exist_ok=True) return Path(context).joinpath(args_hash) - def _impl2(self, context: str, args_hash: str) -> Path: - result = Path(context).joinpath(args_hash) - self.path.joinpath(result).mkdir(parents=True, exist_ok=True) - return result.joinpath('default') + def get_path(self, + opts: VwOptsLike, + logger: MultiLogger, + version: Version, + output: Optional[str] = None, + salt: Optional[int] = None) -> Path: + args_hash = VwOpts(dict(opts, **{'-#': salt})).hash() + result = self._get_path(f'cache{output}', args_hash) + logger.debug(f'Generating path for opts: {str(opts)}, output: {output}. Result: {result}') + return result + +class _VwCache2(VwCache): + path: Path + + def __init__(self, path: Union[str, Path]): + self.path = Path(path).joinpath('2') + self.path.mkdir(parents=True, exist_ok=True) + + def _get_path(self, context: str, args_hash: str, version: Version) -> Path: + op_folder = Path(context).joinpath(args_hash) + full_path = self.path.joinpath(op_folder) + full_path.mkdir(parents=True, exist_ok=True) + match = next(full_path.glob(version.pattern), None) + if match: + return match.relative_to(self.path) + return op_folder.joinpath(str(version)) def get_path(self, opts: VwOptsLike, logger: MultiLogger, + version: Version, output: Optional[str] = None, salt: Optional[int] = None) -> Path: args_hash = VwOpts(dict(opts, **{'-#': salt})).hash() - result = self.impl(f'cache{output}', args_hash) + result = self._get_path(f'cache{output}', args_hash, version) logger.debug(f'Generating path for opts: {str(opts)}, output: {output}. Result: {result}') return result +def create_cache(path: Union[str, Path], version: int): + if version == 1: + return _VwCache1(path) + elif version == 2: + return _VwCache2(path) + return None From 119d13db91f110570000ab39b4fa059e5b357c7c Mon Sep 17 00:00:00 2001 From: Alexey Taymanov Date: Sun, 26 Jun 2022 15:50:19 -0400 Subject: [PATCH 8/8] everything except input model works --- from_mwt_ds/DataScience/vw_executor/vw.py | 8 +-- .../DataScience/vw_executor/vw_cache.py | 60 ++++++++----------- 2 files changed, 30 insertions(+), 38 deletions(-) diff --git a/from_mwt_ds/DataScience/vw_executor/vw.py b/from_mwt_ds/DataScience/vw_executor/vw.py index f3ec9fb..1292de9 100644 --- a/from_mwt_ds/DataScience/vw_executor/vw.py +++ b/from_mwt_ds/DataScience/vw_executor/vw.py @@ -141,13 +141,13 @@ def _prepare_args(self, cache: VwCache) -> str: opts['-i'] = self.model_file self.outputs_relative = { - o: cache.get_path(opts, self._logger, self.job.core.version, o, salt) + o: cache.get_path_for_hash(opts, self._logger, o, salt) for o in self.job.outputs.keys() } - self.outputs = {o: cache.path.joinpath(p) for o, p in self.outputs_relative.items()} + self.outputs = {o: cache.get_path(p, self.job.core.version) for o, p in self.outputs_relative.items()} - self.stdout = Output(cache.path.joinpath( - cache.get_path(opts, self._logger, self.job.core.version, None, salt))) + self.stdout = Output(cache.get_path( + cache.get_path_for_hash(opts, self._logger, None, salt), self.job.core.version)) if self.model_file: opts['-i'] = self.model_folder.joinpath(self.model_file) diff --git a/from_mwt_ds/DataScience/vw_executor/vw_cache.py b/from_mwt_ds/DataScience/vw_executor/vw_cache.py index 3c895bc..c596511 100644 --- a/from_mwt_ds/DataScience/vw_executor/vw_cache.py +++ b/from_mwt_ds/DataScience/vw_executor/vw_cache.py @@ -9,31 +9,16 @@ from abc import ABC, abstractmethod, abstractproperty class VwCache(ABC): - @abstractmethod - def get_path(self, - opts: VwOptsLike, - logger: MultiLogger, - version: Version, - output: Optional[str] = None, - salt: Optional[int] = None): - ... - -class _VwCache1(VwCache): - path: Path - def __init__(self, path: Union[str, Path]): self.path = Path(path) self.path.mkdir(parents=True, exist_ok=True) def _get_path(self, context: str, args_hash: str) -> Path: - folder = self.path.joinpath(context) - folder.mkdir(parents=True, exist_ok=True) return Path(context).joinpath(args_hash) - def get_path(self, + def get_path_for_hash(self, opts: VwOptsLike, logger: MultiLogger, - version: Version, output: Optional[str] = None, salt: Optional[int] = None) -> Path: args_hash = VwOpts(dict(opts, **{'-#': salt})).hash() @@ -41,32 +26,39 @@ def get_path(self, logger.debug(f'Generating path for opts: {str(opts)}, output: {output}. Result: {result}') return result + @abstractmethod + def get_path(self, + path_for_hash: Path, + version: Version, + logger: MultiLogger): + ... + +class _VwCache1(VwCache): + path: Path + + def __init__(self, path: Union[str, Path]): + super().__init__(path) + + def get_path(self, path_for_hash: Path, _: Version): + full_path = self.path.joinpath(path_for_hash) + full_path.parent.mkdir(parents=True, exist_ok=True) + return full_path + + + class _VwCache2(VwCache): path: Path def __init__(self, path: Union[str, Path]): - self.path = Path(path).joinpath('2') - self.path.mkdir(parents=True, exist_ok=True) + super().__init__(Path(path).joinpath('v2')) - def _get_path(self, context: str, args_hash: str, version: Version) -> Path: - op_folder = Path(context).joinpath(args_hash) - full_path = self.path.joinpath(op_folder) + def get_path(self, path_for_hash: Path, version: Version): + full_path = self.path.joinpath(path_for_hash) full_path.mkdir(parents=True, exist_ok=True) match = next(full_path.glob(version.pattern), None) - if match: - return match.relative_to(self.path) - return op_folder.joinpath(str(version)) + return match or full_path.joinpath(str(version)) - def get_path(self, - opts: VwOptsLike, - logger: MultiLogger, - version: Version, - output: Optional[str] = None, - salt: Optional[int] = None) -> Path: - args_hash = VwOpts(dict(opts, **{'-#': salt})).hash() - result = self._get_path(f'cache{output}', args_hash, version) - logger.debug(f'Generating path for opts: {str(opts)}, output: {output}. Result: {result}') - return result + def get_model_path(self, path_for_hash: Path, instance: str): def create_cache(path: Union[str, Path], version: int): if version == 1: