Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][Dependent] Fix task creation with cloud storage data #7903

Open
wants to merge 17 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cvat/apps/engine/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def _prepare_cloud_preview(self, db_storage):
manifest.set_index()
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved
if not len(manifest):
continue
preview_info = manifest[0]
preview_info = manifest.get_first_not_empty_item()
preview_filename = ''.join([preview_info['name'], preview_info['extension']])
preview_path = os.path.join(manifest_prefix, preview_filename)
break
Expand Down
38 changes: 27 additions & 11 deletions cvat/apps/engine/cloud_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import functools
import json
import os
import math
coderabbitai[bot] marked this conversation as resolved.
Show resolved Hide resolved
from abc import ABC, abstractmethod, abstractproperty
from enum import Enum
from io import BytesIO
from multiprocessing.pool import ThreadPool
from typing import Dict, List, Optional, Any, Callable, TypeVar
from typing import Dict, List, Optional, Any, Callable, TypeVar, Iterator
from concurrent.futures import ThreadPoolExecutor
coderabbitai[bot] marked this conversation as resolved.
Show resolved Hide resolved

import boto3
from azure.core.exceptions import HttpResponseError, ResourceExistsError
Expand All @@ -35,6 +37,10 @@

ImageFile.LOAD_TRUNCATED_IMAGES = True

CPU_NUMBER = get_cpu_number()
MAX_THREADS_NUMBER = 4
NUMBER_OF_FILES_PER_THREAD = 1000
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved

class Status(str, Enum):
AVAILABLE = 'AVAILABLE'
NOT_FOUND = 'NOT_FOUND'
Expand Down Expand Up @@ -176,7 +182,7 @@ def optimally_image_download(self, key: str, chunk_size: int = 65536) -> BytesIO
Returns:
BytesIO: Buffer with image
"""
image_parser=ImageFile.Parser()
image_parser = ImageFile.Parser()
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved

chunk = self.download_range_of_bytes(key, chunk_size - 1)
image_parser.feed(chunk)
Expand All @@ -196,29 +202,39 @@ def optimally_image_download(self, key: str, chunk_size: int = 65536) -> BytesIO
def bulk_download_to_memory(
self,
files: List[str],
threads_number: int = min(get_cpu_number(), 4),
*,
threads_number: Optional[int] = None,
_use_optimal_downloading: bool = True,
) -> List[BytesIO]:
) -> Iterator[BytesIO]:
func = self.optimally_image_download if _use_optimal_downloading else self.download_fileobj
if threads_number > 1:
with ThreadPool(threads_number) as pool:
return pool.map(func, files)
if threads_number is None:
threads_number = min(CPU_NUMBER, MAX_THREADS_NUMBER, max(math.ceil(len(files) / NUMBER_OF_FILES_PER_THREAD), 1))
else:
slogger.glob.warning('Download files to memory in series in one thread.')
return [func(f) for f in files]
threads_number = min(threads_number, CPU_NUMBER, MAX_THREADS_NUMBER)

if threads_number == 1:
slogger.glob.info('Download files to memory in series in one thread.')
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved

with ThreadPoolExecutor(max_workers=threads_number) as executor:
yield from executor.map(func, files)
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved

def bulk_download_to_dir(
self,
files: List[str],
upload_dir: str,
threads_number: int = min(get_cpu_number(), 4),
threads_number: Optional[int] = None,
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
):
if threads_number is None:
threads_number = min(CPU_NUMBER, MAX_THREADS_NUMBER, max(math.ceil(len(files) / NUMBER_OF_FILES_PER_THREAD), 1))
else:
threads_number = min(threads_number, CPU_NUMBER, MAX_THREADS_NUMBER)

args = zip(files, [os.path.join(upload_dir, f) for f in files])
if threads_number > 1:
with ThreadPool(threads_number) as pool:
return pool.map(lambda x: self.download_file(*x), args)
else:
slogger.glob.warning(f'Download files to {upload_dir} directory in series in one thread.')
slogger.glob.info(f'Download files to {upload_dir} directory in series in one thread.')
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
for f, path in args:
self.download_file(f, path)

Expand Down
43 changes: 38 additions & 5 deletions cvat/apps/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,11 +481,25 @@ def _create_task_manifest_from_cloud_data(
db_storage: models.CloudStorage,
sorted_media: List[str],
manifest: ImageManifestManager,
*,
stop_frame: int,
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
start_frame: int = 0,
step: int = 1,
dimension: models.DimensionType = models.DimensionType.DIM_2D,
) -> None:
cloud_storage_instance = db_storage_to_storage_instance(db_storage)
content = cloud_storage_instance.bulk_download_to_memory(sorted_media)
manifest.link(sources=content, DIM_3D=dimension == models.DimensionType.DIM_3D)
index_range = range(start_frame, stop_frame, step)
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
filtered_sorted_media = [sorted_media[idx] for idx in index_range]
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
content_generator = cloud_storage_instance.bulk_download_to_memory(filtered_sorted_media)

manifest.link(
sources=content_generator,
DIM_3D=dimension == models.DimensionType.DIM_3D,
stop=stop_frame,
start=start_frame,
step=step,
are_sources_filtered=True,
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved
)
manifest.create()

@transaction.atomic
Expand Down Expand Up @@ -657,7 +671,20 @@ def _update_status(msg: str) -> None:
filtered_data = []
for files in (i for i in media.values() if i):
filtered_data.extend(files)
_download_data_from_cloud_storage(db_data.cloud_storage, filtered_data, upload_dir)
media_to_download = filtered_data

if media['image']:
start_frame = db_data.start_frame

stop_frame = len(filtered_data)
if data['stop_frame'] is not None:
stop_frame = min(stop_frame, data['stop_frame'] + 1)

step = db_data.get_frame_step()
if start_frame or step != 1 or stop_frame != len(filtered_data):
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved
media_to_download = [filtered_data[idx] for idx in range(start_frame, stop_frame, step)]
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
_download_data_from_cloud_storage(db_data.cloud_storage, media_to_download, upload_dir)
del media_to_download
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
is_data_in_cloud = False
db_data.storage = models.StorageChoice.LOCAL
else:
Expand Down Expand Up @@ -686,7 +713,13 @@ def _update_status(msg: str) -> None:
cloud_storage_manifest, manifest)
else: # without manifest file but with use_cache option
# Define task manifest content based on list with uploaded files
_create_task_manifest_from_cloud_data(db_data.cloud_storage, sorted_media, manifest)
stop_frame = len(sorted_media)
if data['stop_frame'] is not None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor code to remove duplicates

stop_frame = min(stop_frame, data['stop_frame'] + 1)
_create_task_manifest_from_cloud_data(
db_data.cloud_storage, sorted_media, manifest,
start_frame=db_data.start_frame, stop_frame=stop_frame, step=db_data.get_frame_step()
)

av_scan_paths(upload_dir)

Expand Down Expand Up @@ -913,7 +946,7 @@ def update_progress(progress):
if not is_data_in_cloud:
w, h = extractor.get_image_size(0)
else:
img_properties = manifest[0]
img_properties = manifest.get_first_not_empty_item()
w, h = img_properties['width'], img_properties['height']
area = h * w
db_data.chunk_size = max(2, min(72, 36 * 1920 * 1080 // area))
Expand Down
145 changes: 103 additions & 42 deletions utils/dataset_manifest/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
from contextlib import closing
from PIL import Image
from json.decoder import JSONDecodeError
from inspect import isgenerator

from .errors import InvalidManifestError, InvalidVideoError
from .utils import SortingMethod, md5_hash, rotate_image, sort

from typing import Dict, List, Union, Optional, Iterator, Tuple
from typing import Any, Dict, List, Union, Optional, Iterator, Tuple

class VideoStreamReader:
def __init__(self, source_path, chunk_size, force):
Expand Down Expand Up @@ -139,23 +140,33 @@ def __iter__(self) -> Iterator[Union[int, Tuple[int, int, str]]]:

class DatasetImagesReader:
def __init__(self,
sources: Union[List[str], List[BytesIO]],
*,
start: int = 0,
step: int = 1,
stop: Optional[int] = None,
meta: Optional[Dict[str, List[str]]] = None,
sorting_method: SortingMethod =SortingMethod.PREDEFINED,
use_image_hash: bool = False,
**kwargs):
self._raw_data_used = not isinstance(sources[0], str)
func = (lambda x: x.filename) if self._raw_data_used else None
self._sources = sort(sources, sorting_method, func=func)
sources: Union[List[str], Iterator[BytesIO]],
*,
start: int = 0,
step: int = 1,
stop: Optional[int] = None,
meta: Optional[Dict[str, List[str]]] = None,
sorting_method: SortingMethod =SortingMethod.PREDEFINED,
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
use_image_hash: bool = False,
are_sources_filtered: bool = False,
**kwargs
):
self._is_generator_used = isgenerator(sources)

if not self._is_generator_used:
raw_data_used = not isinstance(sources[0], str)
func = (lambda x: x.filename) if raw_data_used else None
self._sources = sort(sources, sorting_method, func=func)
else:
if sorting_method != SortingMethod.PREDEFINED:
raise ValueError('Only SortingMethod.PREDEFINED can be used with generator')
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved
coderabbitai[bot] marked this conversation as resolved.
Show resolved Hide resolved
self._sources = sources
self._meta = meta
self._data_dir = kwargs.get('data_dir', None)
self._use_image_hash = use_image_hash
self._start = start
self._stop = stop if stop else len(sources)
self._stop = stop if stop or self._is_generator_used else len(sources)
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
self.are_sources_filtered = are_sources_filtered
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
self._step = step

@property
Expand All @@ -182,37 +193,69 @@ def step(self):
def step(self, value):
self._step = int(value)

def __iter__(self):
sources = (i for i in self._sources)
for idx in range(self._stop):
if idx in self.range_:
image = next(sources)
img = Image.open(image, mode='r')
def _get_img_properties(self, image: Union[str, BytesIO]) -> Dict[str, Any]:
img = Image.open(image, mode='r')
if self._data_dir:
img_name = os.path.relpath(image, self._data_dir)
else:
img_name = os.path.basename(image) if isinstance(image, str) else image.filename
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved

img_name = os.path.relpath(image, self._data_dir) if self._data_dir \
else os.path.basename(image) if not self._raw_data_used else image.filename
name, extension = os.path.splitext(img_name)
image_properties = {
'name': name.replace('\\', '/'),
'extension': extension,
}
name, extension = os.path.splitext(img_name)
image_properties = {
'name': name.replace('\\', '/'),
'extension': extension,
}

width, height = img.width, img.height
orientation = img.getexif().get(274, 1)
if orientation > 4:
width, height = height, width
image_properties['width'] = width
image_properties['height'] = height
width, height = img.width, img.height
orientation = img.getexif().get(274, 1)
if orientation > 4:
width, height = height, width
image_properties['width'] = width
image_properties['height'] = height

if self._meta and img_name in self._meta:
image_properties['meta'] = self._meta[img_name]
if self._meta and img_name in self._meta:
image_properties['meta'] = self._meta[img_name]

if self._use_image_hash:
image_properties['checksum'] = md5_hash(img)
if self._use_image_hash:
image_properties['checksum'] = md5_hash(img)

yield image_properties
return image_properties
coderabbitai[bot] marked this conversation as resolved.
Show resolved Hide resolved

def __iter__(self):
if not self.are_sources_filtered:
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved
if self.stop is not None:
index_range = range(self.start, self.stop, self.step)
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
idx = 0
for image in self._sources:
if idx in index_range:
yield self._get_img_properties(image)
else:
yield dict()
idx += 1
elif self.start or self.step != 1:
idx = 0
for image in self._sources:
if idx >= self.start and not (idx - self._start) % self.step:
yield self._get_img_properties(image)
else:
yield dict()
else:
yield dict()
for image in self._sources:
yield self._get_img_properties(image)
else:
sources = iter(self._sources) if not self._is_generator_used else self._sources

assert self.start is not None
assert self.stop is not None
assert self.step is not None
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved

index_range = range(self.start, self.stop, self.step)
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
for idx in range(self.stop):
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
if idx in index_range:
image = next(sources)
yield self._get_img_properties(image)
else:
yield dict()
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved

@property
def range_(self):
Expand Down Expand Up @@ -351,14 +394,16 @@ class _ManifestManager(ABC):
'type': 2,
}

def _json_item_is_valid(self, **state):
def _validate_json_item(self, **state) -> None:
for item in self._required_item_attributes:
if state.get(item, None) is None:
raise InvalidManifestError(
f"Invalid '{self.manifest.name}' file structure: "
f"'{item}' is required, but not found"
)

return None

def __init__(self, path, create_index, upload_dir=None):
self._manifest = _Manifest(path, upload_dir)
self._index = _Index(os.path.dirname(self._manifest.path))
Expand All @@ -383,8 +428,9 @@ def _parse_line(self, line):
offset = self._index[line]
manifest_file.seek(offset)
properties = manifest_file.readline()
# fixme: remove from common class
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
parsed_properties = ImageProperties(json.loads(properties))
self._json_item_is_valid(**parsed_properties)
self._validate_json_item(**parsed_properties)
return parsed_properties

def init_index(self):
Expand Down Expand Up @@ -424,8 +470,9 @@ def __iter__(self):
for idx, line_start in enumerate(self._index):
manifest_file.seek(line_start)
line = manifest_file.readline()
# fixme: remove from common class
item = ImageProperties(json.loads(line))
self._json_item_is_valid(**item)
self._validate_json_item(**item)
yield (idx, item)

@property
Expand Down Expand Up @@ -579,6 +626,13 @@ def __init__(self, manifest_path, upload_dir=None, create_index=True):
super().__init__(manifest_path, create_index, upload_dir)
setattr(self._manifest, 'TYPE', 'images')

def _validate_json_item(self, **state) -> None:
# empty json is allowed
if state:
super()._validate_json_item(**state)

return None

def link(self, **kwargs):
ReaderClass = DatasetImagesReader if not kwargs.get('DIM_3D', None) else Dataset3DImagesReader
self._reader = ReaderClass(**kwargs)
Expand Down Expand Up @@ -611,6 +665,13 @@ def create(self, content=None, _tqdm=None):

self.set_index()

def get_first_not_empty_item(self) -> ImageProperties:
for _, image in self:
if image:
return image

raise InvalidManifestError('Manifest is empty')
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved

def partial_update(self, number, properties):
pass

Expand Down
Loading