Skip to content

Commit 18163e0

Browse files
committed
fix: Optimize memory and timeout handling for 50GB+ files
1 parent 3c5264f commit 18163e0

File tree

2 files changed

+43
-17
lines changed

2 files changed

+43
-17
lines changed

src/together/filemanager.py

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ def download(
212212
),
213213
remaining_retries=MAX_RETRIES,
214214
stream=True,
215+
request_timeout=3600,
215216
)
216217

217218
try:
@@ -522,29 +523,47 @@ def _upload_parts_concurrent(
522523

523524
with ThreadPoolExecutor(max_workers=self.max_concurrent_parts) as executor:
524525
with tqdm(total=len(parts), desc="Uploading parts", unit="part") as pbar:
525-
future_to_part = {}
526-
527526
with open(file, "rb") as f:
528-
for part_info in parts:
527+
future_to_part = {}
528+
part_index = 0
529+
530+
# Submit initial batch limited by max_concurrent_parts
531+
for i in range(min(self.max_concurrent_parts, len(parts))):
532+
part_info = parts[part_index]
529533
f.seek((part_info["PartNumber"] - 1) * part_size)
530534
part_data = f.read(part_size)
531535

532536
future = executor.submit(
533537
self._upload_single_part, part_info, part_data
534538
)
535539
future_to_part[future] = part_info["PartNumber"]
536-
537-
# Collect results
538-
for future in as_completed(future_to_part):
539-
part_number = future_to_part[future]
540-
try:
541-
etag = future.result()
542-
completed_parts.append(
543-
{"part_number": part_number, "etag": etag}
544-
)
545-
pbar.update(1)
546-
except Exception as e:
547-
raise Exception(f"Failed to upload part {part_number}: {e}")
540+
part_index += 1
541+
542+
# Process completions and submit new parts (sliding window)
543+
while future_to_part:
544+
done_future = next(as_completed(future_to_part))
545+
part_number = future_to_part.pop(done_future)
546+
547+
try:
548+
etag = done_future.result()
549+
completed_parts.append(
550+
{"part_number": part_number, "etag": etag}
551+
)
552+
pbar.update(1)
553+
except Exception as e:
554+
raise Exception(f"Failed to upload part {part_number}: {e}")
555+
556+
# Submit next part if available
557+
if part_index < len(parts):
558+
part_info = parts[part_index]
559+
f.seek((part_info["PartNumber"] - 1) * part_size)
560+
part_data = f.read(part_size)
561+
562+
future = executor.submit(
563+
self._upload_single_part, part_info, part_data
564+
)
565+
future_to_part[future] = part_info["PartNumber"]
566+
part_index += 1
548567

549568
completed_parts.sort(key=lambda x: x["part_number"])
550569
return completed_parts

src/together/utils/files.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from traceback import format_exc
88
from typing import Any, Dict, List
99

10+
from tqdm import tqdm
1011

1112
from together.constants import (
1213
MAX_FILE_SIZE_GB,
@@ -363,14 +364,20 @@ def _check_utf8(file: Path) -> Dict[str, Any]:
363364
Dict[str, Any]: A dictionary with the results of the check.
364365
"""
365366
report_dict: Dict[str, Any] = {}
367+
366368
try:
369+
# Stream file in chunks to avoid loading entire file into memory
370+
chunk_size = 8192 # 8KB chunks
367371
with file.open(encoding="utf-8") as f:
368-
f.read()
372+
for chunk in iter(lambda: f.read(chunk_size), ""):
373+
pass # UTF-8 decoding happens automatically during read
374+
369375
report_dict["utf8"] = True
370376
except UnicodeDecodeError as e:
371377
report_dict["utf8"] = False
372378
report_dict["message"] = f"File is not UTF-8 encoded. Error raised: {e}."
373379
report_dict["is_check_passed"] = False
380+
374381
return report_dict
375382

376383

@@ -470,7 +477,7 @@ def _check_jsonl(file: Path, purpose: FilePurpose | str) -> Dict[str, Any]:
470477
with file.open() as f:
471478
idx = -1
472479
try:
473-
for idx, line in enumerate(f):
480+
for idx, line in tqdm(enumerate(f), desc="Validating file", unit=" lines"):
474481
json_line = json.loads(line)
475482

476483
if not isinstance(json_line, dict):

0 commit comments

Comments
 (0)