Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions from_mwt_ds/DataScience/vw_executor/version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import re

def _extract_commit(s):
result = re.search('.*\(git commit:\s+(\S+)\)', s)
if result:
return result.group(1)
return None


class Version:
def __init__(self, kind: str, version: str):
self.kind = kind
parts = [int(i) for i in version.split()[0].split('.')]
assert len(parts) == 3
self.major = parts[0]
self.minor = parts[1]
self.rev = parts[2]
self.commit = _extract_commit(version)
self.pattern = str(self)

def __str__(self):
return f'{self.kind}-{self.major}.{self.minor}.{self.rev}-{self.commit or ""}'
50 changes: 39 additions & 11 deletions from_mwt_ds/DataScience/vw_executor/vw.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
from vw_executor.artifacts import Output, Predictions, Model8, Model9, Model
from vw_executor.pool import SeqPool, MultiThreadPool, Pool
from vw_executor.loggers import MultiLogger, ILogger
from vw_executor.vw_cache import VwCache
from vw_executor.vw_cache import VwCache, create_cache
from vw_executor.handlers import MultiHandler, HandlerBase, ProgressBars
from vw_executor.vw_opts import VwOpts, InteractiveGrid, VwOptsLike, GridLike
from vw_executor.version import Version

from typing import Iterable, Optional, Union, Dict, Any, Type, List
from abc import ABC, abstractmethod
from abc import ABC, abstractmethod, abstractproperty


def _save(txt: Union[str, Iterable[str]], path: Path) -> None:
Expand Down Expand Up @@ -42,12 +43,17 @@ def __init__(self, path: Optional[Path]):
def run(self, args: str) -> Union[str, List[str]]:
...

@abstractproperty
def version(self) -> Version:
...


class _VwBin(_VwCore):
def __init__(self, path: Path):
super().__init__(path)
self._version = Version(kind='vw', version=self.run("--version", stdout=True).strip())

def run(self, args: str) -> str:
def run(self, args: str, stdout: bool = False) -> str:
command = f'{self.path} {args}'
process = subprocess.Popen(
command.split(),
Expand All @@ -56,9 +62,13 @@ def run(self, args: str) -> str:
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
error = process.communicate()[1]
error = process.communicate()[0 if stdout else 1]
return error

@property
def version(self) -> Version:
return self._version


def _run_pyvw(args: str) -> Iterable[str]:
from vowpalwabbit import pyvw
Expand All @@ -69,13 +79,19 @@ def _run_pyvw(args: str) -> Iterable[str]:

class _VwPy(_VwCore):
def __init__(self):
import vowpalwabbit
super().__init__(None)
self._version = Version(kind='pyvw', version=vowpalwabbit.__version__)

def run(self, args: str) -> Iterable[str]:
from multiprocessing import Pool
with Pool(1) as p:
return p.apply(_run_pyvw, [args])

@property
def version(self) -> Version:
return self._version


class Task:
job: 'Job'
Expand Down Expand Up @@ -123,10 +139,14 @@ def _prepare_args(self, cache: VwCache) -> str:
if self.model_file:
opts['-i'] = self.model_file

self.outputs_relative = {o: cache.get_path(opts, self._logger, o, salt) for o in self.job.outputs.keys()}
self.outputs = {o: cache.path.joinpath(p) for o, p in self.outputs_relative.items()}
self.outputs_relative = {
o: cache.get_path_for_hash(opts, self._logger, o, salt)
for o in self.job.outputs.keys()
}
self.outputs = {o: cache.get_path(p, self.job.core.version) for o, p in self.outputs_relative.items()}

self.stdout = Output(cache.path.joinpath(cache.get_path(opts, self._logger, None, salt)))
self.stdout = Output(cache.get_path(
cache.get_path_for_hash(opts, self._logger, None, salt), self.job.core.version))

if self.model_file:
opts['-i'] = self.model_folder.joinpath(self.model_file)
Expand Down Expand Up @@ -332,8 +352,10 @@ def __init__(self,
no_run: bool = False,
reset: bool = False,
handler: Optional[HandlerBase] = ProgressBars(),
logger: Optional[ILogger] = None):
self._cache = VwCache(_assert_path_is_supported(cache_path))
logger: Optional[ILogger] = None,
cache_version: int = 1):
self._cache_version = cache_version
self._cache = create_cache(_assert_path_is_supported(cache_path), version=cache_version)
self._vw = _VwBin(path) if path is not None else _VwPy()
self.logger = logger or MultiLogger([])
self.pool = SeqPool() if procs == 1 else MultiThreadPool(procs)
Expand All @@ -349,14 +371,20 @@ def _with(self,
no_run: Optional[bool] = None,
reset: Optional[bool] = None,
handler: Optional[HandlerBase] = None,
logger: Optional[ILogger] = None) -> 'Vw':
logger: Optional[ILogger] = None,
cache_version: Optional[int] = None) -> 'Vw':
return Vw(cache_path or self._cache.path,
path or self._vw.path,
procs or self.pool.procs,
no_run if no_run is not None else self.no_run,
reset if reset is not None else self.reset,
handler or self.handler,
logger or self.logger)
logger or self.logger,
cache_version or self._cache_version)

@property
def version(self):
return self._vw.version

def _run_impl(self,
inputs: List[Path],
Expand Down
51 changes: 45 additions & 6 deletions from_mwt_ds/DataScience/vw_executor/vw_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@

from vw_executor.loggers import MultiLogger
from vw_executor.vw_opts import VwOptsLike, VwOpts
from vw_executor.version import Version

from typing import Optional, Union

from abc import ABC, abstractmethod, abstractproperty

class VwCache:
path: Path

class VwCache(ABC):
def __init__(self, path: Union[str, Path]):
self.path = Path(path)
self.path.mkdir(parents=True, exist_ok=True)

def _get_path(self, context: str, args_hash: str) -> Path:
folder = self.path.joinpath(context)
folder.mkdir(parents=True, exist_ok=True)
return Path(context).joinpath(args_hash)

def get_path(self,
def get_path_for_hash(self,
opts: VwOptsLike,
logger: MultiLogger,
output: Optional[str] = None,
Expand All @@ -27,3 +25,44 @@ def get_path(self,
result = self._get_path(f'cache{output}', args_hash)
logger.debug(f'Generating path for opts: {str(opts)}, output: {output}. Result: {result}')
return result

@abstractmethod
def get_path(self,
path_for_hash: Path,
version: Version,
logger: MultiLogger):
...

class _VwCache1(VwCache):
path: Path

def __init__(self, path: Union[str, Path]):
super().__init__(path)

def get_path(self, path_for_hash: Path, _: Version):
full_path = self.path.joinpath(path_for_hash)
full_path.parent.mkdir(parents=True, exist_ok=True)
return full_path



class _VwCache2(VwCache):
path: Path

def __init__(self, path: Union[str, Path]):
super().__init__(Path(path).joinpath('v2'))

def get_path(self, path_for_hash: Path, version: Version):
full_path = self.path.joinpath(path_for_hash)
full_path.mkdir(parents=True, exist_ok=True)
match = next(full_path.glob(version.pattern), None)
return match or full_path.joinpath(str(version))

def get_model_path(self, path_for_hash: Path, instance: str):

def create_cache(path: Union[str, Path], version: int):
if version == 1:
return _VwCache1(path)
elif version == 2:
return _VwCache2(path)
return None