Skip to content
2 changes: 2 additions & 0 deletions cwltool/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def __init__(
stagedir: str,
cwlVersion: str,
container_engine: str,
dockerstagedir: str = None,
) -> None:
"""Initialize this Builder."""
super().__init__()
Expand Down Expand Up @@ -211,6 +212,7 @@ def __init__(
self.outdir = outdir
self.tmpdir = tmpdir
self.stagedir = stagedir
self.dockerstagedir = dockerstagedir

self.cwlVersion = cwlVersion

Expand Down
5 changes: 3 additions & 2 deletions cwltool/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,9 @@ def append_volume(
options.append("readonly")
output = StringIO()
csv.writer(output).writerow(options)
mount_arg = output.getvalue().strip()
runtime.append(f"--mount={mount_arg}")
mount_arg = f"--mount={output.getvalue().strip()}"
if mount_arg not in runtime:
runtime.append(mount_arg)
# Unlike "--volume", "--mount" will fail if the volume doesn't already exist.
if not os.path.exists(source):
os.makedirs(source)
Expand Down
68 changes: 65 additions & 3 deletions cwltool/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ def run(
else:
if not os.path.exists(self.tmpdir):
os.makedirs(self.tmpdir)

self._setup(runtimeContext)

stage_files(
Expand Down Expand Up @@ -696,19 +695,27 @@ def add_volumes(
) -> None:
"""Append volume mappings to the runtime option list."""
container_outdir = self.builder.outdir
dockerstagedir = self.builder.dockerstagedir
if dockerstagedir:
pathmapper.update(dockerstagedir, dockerstagedir, self.builder.stagedir, "Directory", True)
for key, vol in (itm for itm in pathmapper.items() if itm[1].staged):
host_outdir_tgt = None # type: Optional[str]
if vol.target.startswith(container_outdir + "/"):
host_outdir_tgt = os.path.join(
self.outdir, vol.target[len(container_outdir) + 1 :]
)
if not host_outdir_tgt and not any_path_okay:
if not host_outdir_tgt and not any_path_okay and key != dockerstagedir:
raise WorkflowException(
"No mandatory DockerRequirement, yet path is outside "
"the designated output directory, also know as "
"$(runtime.outdir): {}".format(vol)
)
if vol.type in ("File", "Directory"):
if vol.type == "File":
if dockerstagedir and vol.resolved.startswith(dockerstagedir):
pass
else:
self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt)
elif vol.type == "Directory":
self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt)
elif vol.type == "WritableFile":
self.add_writable_file_volume(
Expand Down Expand Up @@ -835,6 +842,25 @@ def run(
env = dict(os.environ)
(runtime, cidfile) = self.create_runtime(env, runtimeContext)

# if command is larger than this, we might exceed system limits
# 2097150 bytes is the Ubuntu system default
# check with: echo $(( $(ulimit -s)*1024 / 4 ))
# or: getconf ARG_MAX
# TODO: create a cross-platform function to check limit where it's run
if len(''.join(runtime)) >= 2097152 - 2:
if runtimeContext.singularity:
# TODO: write the singularity equivalent
# copy_these_into_container, new_runtime = self.filter_out_singularity_image_file_inputs(runtime)
# if copy_these_into_container:
# runtime = new_runtime
# img_id = self.bake_inputs_into_docker_container(str(img_id), copy_these_into_container)
pass
else:
copy_these_into_container, new_runtime = self.filter_out_docker_image_file_inputs(runtime)
if copy_these_into_container:
runtime = new_runtime
img_id = self.bake_inputs_into_docker_container(str(img_id), copy_these_into_container)

runtime.append(str(img_id))
monitor_function = None
if cidfile:
Expand All @@ -848,6 +874,42 @@ def run(
monitor_function = functools.partial(self.process_monitor)
self._execute(runtime, env, runtimeContext, monitor_function)

def bake_inputs_into_docker_container(self, image_id, inputs):
i = [f'FROM {image_id}']

# we do this to keep files in the build context for docker
staging_dir = tempfile.mkdtemp()
for input in inputs:
staged_file_uuid = str(uuid.uuid4())
staged_file = os.path.join(staging_dir, staged_file_uuid)
shutil.copy(input['src'], staged_file)
i.append(f'COPY {staged_file_uuid} {input["dst"]}')

with open(os.path.join(staging_dir, 'Dockerfile'), 'w') as f:
f.write('\n'.join(i))

docker_id = str(uuid.uuid4())
_logger.critical(str(i))
subprocess.run(['docker', 'build', '-f', os.path.join(staging_dir, 'Dockerfile'), '.', '-t', f'{docker_id}:cwl'],
cwd=staging_dir, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return f'{docker_id}:cwl'

def filter_out_docker_image_file_inputs(self, runtime, outdir):
new_runtime = []
copy_these_into_container = []
for arg in runtime:
if arg.startswith('--mount=type=bind,source='):
src, dst = arg[len('--mount=type=bind,source='):].split(',target=')
if dst.endswith(',readonly'):
dst = dst[:-len(',readonly')]
if os.path.isfile(src) and not dst.startswith(outdir):
copy_these_into_container.append({'src': src, 'dst': dst})
else:
new_runtime.append(arg)
else:
new_runtime.append(arg)
return copy_these_into_container, new_runtime

def docker_monitor(
self,
cidfile: str,
Expand Down
4 changes: 3 additions & 1 deletion cwltool/pathmapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ def setup(self, referenced_files: List[CWLObjectType], basedir: str) -> None:
stagedir = self.stagedir
for fob in referenced_files:
if self.separateDirs:
stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4())
location = fob['location'] if not fob['location'].startswith('file://') else fob['location'][len('file://'):]
stagedir = os.path.join(self.stagedir, os.path.basename(os.path.dirname(location)))
# stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4())
self.visit(
fob,
stagedir,
Expand Down
38 changes: 29 additions & 9 deletions cwltool/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,10 @@ def checkRequirements(
checkRequirements(entry2, supported_process_requirements)


def stage_files(
pathmapper: PathMapper,
stage_func: Optional[Callable[[str, str], None]] = None,
ignore_writable: bool = False,
symlink: bool = True,
secret_store: Optional[SecretStore] = None,
fix_conflicts: bool = False,
) -> None:
"""Link or copy files to their targets. Create them as needed."""
def check_pathmapper_conflicts(pathmapper: PathMapper, fix_conflicts: bool = False) -> None:
"""
All pathmapper resolved file paths should be unique. If any conflict, this will error early or fix them.
"""
targets = {} # type: Dict[str, MapperEnt]
for key, entry in pathmapper.items():
if "File" not in entry.type:
Expand All @@ -285,6 +280,18 @@ def stage_files(
% (targets[entry.target].resolved, entry.resolved, entry.target)
)


def stage_files(
pathmapper: PathMapper,
stage_func: Optional[Callable[[str, str], None]] = None,
ignore_writable: bool = False,
symlink: bool = True,
secret_store: Optional[SecretStore] = None,
fix_conflicts: bool = False,
) -> None:
"""Link or copy files to their targets. Create them as needed."""
check_pathmapper_conflicts(pathmapper, fix_conflicts)

for key, entry in pathmapper.items():
if not entry.staged:
continue
Expand Down Expand Up @@ -771,6 +778,18 @@ def _init_job(
% (self.metadata.get("cwlVersion"), INTERNAL_VERSION)
)

if os.environ.get('GROUP_STAGING') or True:
dockerstagedir = runtime_context.get_stagedir()
for input_keyname, input in joborder.items():
if input.get('location') and input.get('class') == 'File':
location = str(input.get('location'))
location = location if not location.startswith('file://') else location[len('file://'):]
unique_staging_dir = os.path.join(dockerstagedir, str(uuid.uuid4()))
os.makedirs(unique_staging_dir, exist_ok=True)
new_uri = os.path.join(unique_staging_dir, input['basename'])
shutil.copyfile(location, new_uri)
input['location'] = new_uri

job = copy.deepcopy(joborder)

make_fs_access = getdefault(runtime_context.make_fs_access, StdFsAccess)
Expand Down Expand Up @@ -911,6 +930,7 @@ def inc(d): # type: (List[int]) -> None
stagedir,
cwl_version,
self.container_engine,
dockerstagedir,
)

bindings.extend(
Expand Down
12 changes: 6 additions & 6 deletions cwltool/singularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,13 @@ def get_from_requirements(
def append_volume(
runtime: List[str], source: str, target: str, writable: bool = False
) -> None:
runtime.append("--bind")
# Mounts are writable by default, so 'rw' is optional and not
# supported (due to a bug) in some 3.6 series releases.
vol = f"{source}:{target}"
mount_arg = f'--bind={source}:{target}'
if not writable:
vol += ":ro"
runtime.append(vol)
# Mounts are writable by default, so 'rw' is optional and not
# supported (due to a bug) in some 3.6 series releases.
mount_arg += ":ro"
if vol not in runtime:
runtime.append(vol)

def add_file_or_directory_volume(
self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str]
Expand Down