Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ScaFFold/utils/data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
# Masks are values 0 <= x <= n_categories
MASK_DTYPE = np.uint16
# Volumes/img are 0 <= x <= 1
VOLUME_DTYPE = np.float32
VOLUME_DTYPE_NAME = "float32"
VOLUME_NP_DTYPE = getattr(np, VOLUME_DTYPE_NAME)
VOLUME_TORCH_DTYPE = getattr(torch, VOLUME_DTYPE_NAME)
VOLUME_DTYPE = VOLUME_NP_DTYPE

# Shared AMP dtype selection for torch.autocast.
AMP_DTYPE = torch.bfloat16
37 changes: 25 additions & 12 deletions ScaFFold/utils/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from ScaFFold.utils.checkpointing import CheckpointManager
from ScaFFold.utils.data_loading import FractalDataset, SpatialShardSpec
from ScaFFold.utils.data_types import AMP_DTYPE, VOLUME_DTYPE
from ScaFFold.utils.data_types import AMP_DTYPE, VOLUME_TORCH_DTYPE
from ScaFFold.utils.dice_score import compute_sharded_dice
from ScaFFold.utils.distributed import get_local_rank, get_world_rank, get_world_size

Expand Down Expand Up @@ -349,18 +349,18 @@ def cleanup_or_resume(self):
with open(self.outfile_path, "a", newline="") as outfile:
outfile.write(",".join(headers) + "\n")

def _truncate_stats_file(self, start_epoch):
def _truncate_stats_file(self, start_epoch, path=None):
"""
Scans the stats file and truncates it at the first occurrence of
an epoch >= start_epoch. This is O(1) memory and safe for large logs.
"""
self.log.info(
f"Truncating {self.outfile_path} to remove epochs >= {start_epoch}"
)
if path is None:
path = self.outfile_path
self.log.info(f"Truncating {path} to remove epochs >= {start_epoch}")

try:
# Open in read+update mode ('r+') to allow seeking and truncating
with open(self.outfile_path, "r+") as f:
with open(path, "r+") as f:
header = f.readline()
if not header:
return
Expand Down Expand Up @@ -401,7 +401,7 @@ def _truncate_stats_file(self, start_epoch):
pass

except Exception as e:
self.log.warning(f"Failed to truncate stats file: {e}")
self.log.warning(f"Failed to truncate stats file {path}: {e}")

def _get_memsize(self, tensor, tensor_label: str, verbosity: int = 0):
"""Log size of tensor in memory"""
Expand Down Expand Up @@ -436,7 +436,7 @@ def warmup(self):

images = images.to(
device=self.device,
dtype=VOLUME_DTYPE,
dtype=VOLUME_TORCH_DTYPE,
memory_format=torch.channels_last_3d,
non_blocking=True,
)
Expand Down Expand Up @@ -604,14 +604,18 @@ def train(self):
disable=True if self.world_rank != 0 else False,
) as pbar:
begin_code_region("batch_loop")
for batch in self.train_loader:
for batch_idx, batch in enumerate(self.train_loader):
time_minibatch = batch_idx == 0 and self.world_rank == 0
if time_minibatch:
minibatch_start_time = time.perf_counter()

# Load initial samples and labels
images, true_masks = batch["image"], batch["mask"]

begin_code_region("image_to_device")
images = images.to(
device=self.device,
dtype=VOLUME_DTYPE,
dtype=VOLUME_TORCH_DTYPE,
memory_format=torch.channels_last_3d, # NDHWC (channels last) vs NCDHW (channels first)
non_blocking=True,
)
Expand Down Expand Up @@ -724,6 +728,13 @@ def train(self):
self.global_step += 1
# Stay on GPU
epoch_loss += loss.detach()
if time_minibatch:
# This sync has some potential performance impact
# TODO: Would be better to measure this with Caliper, which uses CUDA events.
torch.cuda.synchronize(self.device)
minibatch_time_s = (
time.perf_counter() - minibatch_start_time
)
end_code_region("update_loss")
end_code_region("batch_loop")

Expand All @@ -749,7 +760,9 @@ def train(self):
self.config.n_categories,
self.config._parallel_strategy,
)
dice_info = torch.tensor([dice_sum, numsamples], dtype=VOLUME_DTYPE)
dice_info = torch.tensor(
[dice_sum, numsamples], dtype=VOLUME_TORCH_DTYPE
)
if self.config.dist:
dice_info = dice_info.to(device=self.device)
torch.distributed.all_reduce(
Expand Down Expand Up @@ -789,7 +802,7 @@ def train(self):
)
outfile.flush()
print(
f"Epoch {epoch} completed in {epoch_duration} seconds. Total train time so far: {time.time() - start}"
f"Epoch {epoch} completed in {epoch_duration} seconds. Total train time so far: {time.time() - start}. Rank 0 first batch minibatch_time_s={minibatch_time_s:.6f}."
)

#
Expand Down
Loading