diff --git a/src/openjd/sessions/_session.py b/src/openjd/sessions/_session.py index cb051352..6cdac0d3 100644 --- a/src/openjd/sessions/_session.py +++ b/src/openjd/sessions/_session.py @@ -843,6 +843,63 @@ def run_task( # than after -- run() itself may end up setting the action state to FAILED. self._runner.run() + def _run_task_without_session_env( + self, + *, + step_script: StepScriptModel, + task_parameter_values: TaskParameterSet, + os_env_vars: Optional[dict[str, str]] = None, + log_task_banner: bool = True, + ) -> None: + """Private API to run a task within the session. + This method directly use os_env_vars passed in without applying additional session env setup. + """ + if self.state != SessionState.READY: + raise RuntimeError("Session must be in the READY state to run a task.") + + if log_task_banner: + log_section_banner(self._logger, "Running Task") + + if task_parameter_values: + self._logger.info( + "Parameter values:", + extra=LogExtraInfo(openjd_log_content=LogContent.PARAMETER_INFO), + ) + for name, value in task_parameter_values.items(): + self._logger.info( + f"{name}({str(value.type.value)}) = {value.value}", + extra=LogExtraInfo(openjd_log_content=LogContent.PARAMETER_INFO), + ) + + self._reset_action_state() + symtab = self._symbol_table(step_script.revision, task_parameter_values) + + # Evaluate environment variables + action_env_vars = dict[str, Optional[str]](self._process_env) # Make a copy + if os_env_vars: + action_env_vars.update(**os_env_vars) + + self._materialize_path_mapping(step_script.revision, action_env_vars, symtab) + self._runner = StepScriptRunner( + logger=self._logger, + user=self._user, + os_env_vars=action_env_vars, + session_working_directory=self.working_directory, + startup_directory=self.working_directory, + callback=self._action_callback, + script=step_script, + symtab=symtab, + session_files_directory=self.files_directory, + ) + # Sets the subprocess running. + # Returns immediately after it has started, or is running + self._action_state = ActionState.RUNNING + self._state = SessionState.RUNNING + # Note: This may fail immediately (e.g. if we cannot write embedded files to disk), + # so it's important to set the action_state to RUNNING before calling run(), rather + # than after -- run() itself may end up setting the action state to FAILED. + self._runner.run() + # ========================= # Helpers diff --git a/test/openjd/sessions/test_session.py b/test/openjd/sessions/test_session.py index e4490d72..c74e75ca 100644 --- a/test/openjd/sessions/test_session.py +++ b/test/openjd/sessions/test_session.py @@ -3459,3 +3459,408 @@ def test_multiple_different_redacted_env_vars( log_content = "\n".join(caplog.messages) assert "secret123" not in log_content assert "mysecret123" not in log_content + + +class TestSessionRunTaskWithoutSessionEnv_2023_09: # noqa: N801 + """Testing running tasks without session environment variables with the 2023-09 schema.""" + + @staticmethod + @pytest.fixture + def fix_basic_task_script(python_exe: str) -> StepScript_2023_09: + return StepScript_2023_09( + actions=StepActions_2023_09( + onRun=Action_2023_09( + command=CommandString_2023_09(python_exe), + args=[ArgString_2023_09("{{ Task.File.Foo }}")], + ) + ), + embeddedFiles=[ + EmbeddedFileText_2023_09( + name="Foo", + type=EmbeddedFileTypes_2023_09.TEXT, + data=DataString_2023_09( + "import time; time.sleep(0.5); print('{{ Task.Param.P }} {{ Task.RawParam.P }}'); print('{{ Param.J }} {{ RawParam.J }}')" + ), + ) + ], + ) + + @staticmethod + @pytest.fixture + def fix_foo_baz_environment() -> Environment_2023_09: + return Environment_2023_09( + name="FooBazEnvironment", + variables={ + "FOO": EnvironmentVariableValueString_2023_09("bar"), + "BAZ": EnvironmentVariableValueString_2023_09("qux"), + }, + ) + + def test_run_task_without_session_env( + self, caplog: pytest.LogCaptureFixture, fix_basic_task_script: StepScript_2023_09 + ) -> None: + # GIVEN + # Crafting a StepScript that ensures that references both Job & Task parameters. + # This ensures that we are correctly constructing the symbol table for the run. + session_id = uuid.uuid4().hex + job_params = {"J": ParameterValue(type=ParameterValueType.STRING, value="Jvalue")} + task_params = {"P": ParameterValue(type=ParameterValueType.STRING, value="Pvalue")} + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN + session._run_task_without_session_env( + step_script=fix_basic_task_script, task_parameter_values=task_params + ) + + # THEN + assert session.state == SessionState.RUNNING + assert session.action_status == ActionStatus(state=ActionState.RUNNING) + # Wait for the process to exit + while session.state == SessionState.RUNNING: + time.sleep(0.1) + assert session.state == SessionState.READY + assert session.action_status == ActionStatus(state=ActionState.SUCCESS, exit_code=0) + assert "Jvalue Jvalue" in caplog.messages + assert "Pvalue Pvalue" in caplog.messages + assert "--------- Running Task" in caplog.messages + + def test_run_task_without_session_env_no_log_banners( + self, caplog: pytest.LogCaptureFixture, fix_basic_task_script: StepScript_2023_09 + ) -> None: + # GIVEN + # This ensures that the log_task_banner argument is correctly controlling the task banner logging. + session_id = uuid.uuid4().hex + job_params = {"J": ParameterValue(type=ParameterValueType.STRING, value="Jvalue")} + task_params = {"P": ParameterValue(type=ParameterValueType.STRING, value="Pvalue")} + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN + session._run_task_without_session_env( + step_script=fix_basic_task_script, + task_parameter_values=task_params, + log_task_banner=False, + ) + + # THEN + assert "--------- Running Task" not in caplog.messages + + def test_run_task_without_session_env_with_env_vars( + self, caplog: pytest.LogCaptureFixture, python_exe: str + ) -> None: + # GIVEN + step_script = StepScript_2023_09( + actions=StepActions_2023_09( + onRun=Action_2023_09( + command=CommandString_2023_09(python_exe), + args=[ArgString_2023_09("{{ Task.File.Foo }}")], + ) + ), + embeddedFiles=[ + EmbeddedFileText_2023_09( + name="Foo", + type=EmbeddedFileTypes_2023_09.TEXT, + data=DataString_2023_09( + 'import time; import os; time.sleep(0.5); print(f\'{os.environ["SESSION_VAR"]} {os.environ["ACTION_VAR"]}\')' + ), + ) + ], + ) + + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + task_params = dict[str, ParameterValue]() + session_env_vars = {"SESSION_VAR": "session_value"} + action_env_vars = {"ACTION_VAR": "action_value"} + with Session( + session_id=session_id, job_parameter_values=job_params, os_env_vars=session_env_vars + ) as session: + # WHEN + session._run_task_without_session_env( + step_script=step_script, + task_parameter_values=task_params, + os_env_vars=action_env_vars, + ) + + # THEN + assert session.state == SessionState.RUNNING + assert session.action_status == ActionStatus(state=ActionState.RUNNING) + # Wait for the process to exit + while session.state == SessionState.RUNNING: + time.sleep(0.1) + assert session.state == SessionState.READY + assert session.action_status == ActionStatus(state=ActionState.SUCCESS, exit_code=0) + assert "session_value action_value" in caplog.messages + + @pytest.mark.parametrize( + "state", + [ + pytest.param(state, id=state.value) + for state in SessionState + if state != SessionState.READY + ], + ) + def test_cannot_run_not_ready(self, state: SessionState, python_exe: str) -> None: + # This is checking that we cannot run a task unless the Session is READY + + # GIVEN + # Crafting a EnvironmentScript that ensures that references to Job parameters. + # This ensures that we are correctly constructing the symbol table for the run. + script = StepScript_2023_09( + actions=StepActions_2023_09( + onRun=Action_2023_09( + command=CommandString_2023_09(python_exe), + args=[ArgString_2023_09("-c"), ArgString_2023_09("print('hi')")], + ) + ), + ) + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + task_params = dict[str, ParameterValue]() + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN + session._state = state + + # THEN + with pytest.raises(RuntimeError): + session._run_task_without_session_env( + step_script=script, task_parameter_values=task_params + ) + + def test_run_task_without_session_env_fail_early(self, python_exe: str) -> None: + # Testing a task that fails before running. + # This'll fail because we're referencing a Task parameter that doesn't exist. + + # GIVEN + session_id = uuid.uuid4().hex + job_params = {"J": ParameterValue(type=ParameterValueType.STRING, value="Jvalue")} + task_params = dict[str, ParameterValue]() + step_script = StepScript_2023_09( + actions=StepActions_2023_09( + onRun=Action_2023_09( + command=CommandString_2023_09(python_exe), + args=[ArgString_2023_09("{{ Task.File.Foo }}")], + ) + ), + embeddedFiles=[ + EmbeddedFileText_2023_09( + name="Foo", + type=EmbeddedFileTypes_2023_09.TEXT, + data=DataString_2023_09( + "import time; time.sleep(0.5); print('{{ Task.Param.P }}'); print('{{ Param.J }}')" + ), + ) + ], + ) + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN + session._run_task_without_session_env( + step_script=step_script, task_parameter_values=task_params + ) + + # THEN + assert session.state == SessionState.READY_ENDING + assert session.action_status == ActionStatus( + state=ActionState.FAILED, + fail_message="Error resolving format string: Failed to parse interpolation expression at [37, 55]. Expression: Task.Param.P . Reason: Expression failed validation: Task.Param.P has no value.", + ) + + def test_run_task_without_session_env_fail_run(self, python_exe: str) -> None: + # Testing a task that fails while running + + # GIVEN + script = StepScript_2023_09( + actions=StepActions_2023_09( + onRun=Action_2023_09( + command=CommandString_2023_09(python_exe), + args=[ArgString_2023_09("{{ Task.File.Foo }}")], + ) + ), + embeddedFiles=[ + EmbeddedFileText_2023_09( + name="Foo", + type=EmbeddedFileTypes_2023_09.TEXT, + data=DataString_2023_09("import sys; sys.exit(1)"), + ) + ], + ) + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + task_params = dict[str, ParameterValue]() + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN + session._run_task_without_session_env( + step_script=script, task_parameter_values=task_params + ) + # Wait for the process to exit + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY_ENDING + assert session.action_status == ActionStatus(state=ActionState.FAILED, exit_code=1) + + def test_no_task_run_after_fail(self, fix_basic_task_script: StepScript_2023_09) -> None: + # Testing that we cannot run a task if we've had a failure. + # This'll fail because we're referencing a Task parameter that doesn't exist. + + # GIVEN + session_id = uuid.uuid4().hex + job_params = {"J": ParameterValue(type=ParameterValueType.STRING, value="Jvalue")} + task_params = dict[str, ParameterValue]() + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN + session._state = SessionState.READY_ENDING + + # THEN + with pytest.raises(RuntimeError): + session._run_task_without_session_env( + step_script=fix_basic_task_script, task_parameter_values=task_params + ) + + # Testing os_env_vars behaviour which is different from run_task + @pytest.mark.parametrize( + "os_env_vars,expected_foo,expected_baz,expected_custom", + [ + (None, "NOT_SET", "NOT_SET", "NOT_SET"), # No os_env_vars, entered env ignored + ( + {"CUSTOM": "custom_value"}, + "NOT_SET", + "NOT_SET", + "custom_value", + ), # Only os_env_vars used + ( + {"FOO": "os_value", "CUSTOM": "custom_value"}, + "os_value", + "NOT_SET", + "custom_value", + ), # os_env_vars override + ], + ) + def test_run_task_without_session_env_ignores_entered_environments( + self, + caplog: pytest.LogCaptureFixture, + python_exe: str, + fix_foo_baz_environment: Environment_2023_09, + os_env_vars: Optional[dict[str, str]], + expected_foo: str, + expected_baz: str, + expected_custom: str, + ) -> None: + """Test that entered environment variables are ignored and only os_env_vars are used.""" + # GIVEN + script = StepScript_2023_09( + actions=StepActions_2023_09( + onRun=Action_2023_09( + command=CommandString_2023_09(python_exe), + args=[ArgString_2023_09("{{ Task.File.Foo }}")], + ) + ), + embeddedFiles=[ + EmbeddedFileText_2023_09( + name="Foo", + type=EmbeddedFileTypes_2023_09.TEXT, + data=DataString_2023_09( + "import os; print(f\"FOO={os.environ.get('FOO', 'NOT_SET')}\"); print(f\"BAZ={os.environ.get('BAZ', 'NOT_SET')}\"); print(f\"CUSTOM={os.environ.get('CUSTOM', 'NOT_SET')}\")" + ), + ) + ], + ) + + with Session(session_id=uuid.uuid4().hex, job_parameter_values={}) as session: + session.enter_environment(environment=fix_foo_baz_environment) + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # WHEN + session._run_task_without_session_env( + step_script=script, + task_parameter_values={}, + os_env_vars=os_env_vars, + ) + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.action_status == ActionStatus(state=ActionState.SUCCESS, exit_code=0) + assert f"FOO={expected_foo}" in caplog.messages + assert f"BAZ={expected_baz}" in caplog.messages + assert f"CUSTOM={expected_custom}" in caplog.messages + + def test_run_task_without_session_env_includes_constructor_env_vars( + self, caplog: pytest.LogCaptureFixture, python_exe: str + ) -> None: + """Test that session constructor env vars are always included in the base process environment.""" + # GIVEN + script = StepScript_2023_09( + actions=StepActions_2023_09( + onRun=Action_2023_09( + command=CommandString_2023_09(python_exe), + args=[ + ArgString_2023_09("-c"), + ArgString_2023_09( + "import os; print(f\"CONSTRUCTOR_VAR={os.environ.get('CONSTRUCTOR_VAR', 'NOT_SET')}\")" + ), + ], + ) + ), + ) + + with Session( + session_id=uuid.uuid4().hex, + job_parameter_values={}, + os_env_vars={"CONSTRUCTOR_VAR": "constructor_value"}, + ) as session: + # WHEN + session._run_task_without_session_env(step_script=script, task_parameter_values={}) + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.action_status == ActionStatus(state=ActionState.SUCCESS, exit_code=0) + assert "CONSTRUCTOR_VAR=constructor_value" in caplog.messages + + def test_run_task_without_session_env_ignores_multiple_environments( + self, + caplog: pytest.LogCaptureFixture, + python_exe: str, + ) -> None: + """Test that multiple entered environments are ignored.""" + # GIVEN - Create environments with overlapping variables + environments = [ + Environment_2023_09( + name="FirstEnvironment", + variables={"FOO": EnvironmentVariableValueString_2023_09("first_value")}, + ), + Environment_2023_09( + name="SecondEnvironment", + variables={"FOO": EnvironmentVariableValueString_2023_09("second_value")}, + ), + ] + + script = StepScript_2023_09( + actions=StepActions_2023_09( + onRun=Action_2023_09( + command=CommandString_2023_09(python_exe), + args=[ + ArgString_2023_09("-c"), + ArgString_2023_09( + "import os; print(f\"FOO={os.environ.get('FOO', 'NOT_SET')}\")" + ), + ], + ) + ), + ) + + with Session(session_id=uuid.uuid4().hex, job_parameter_values={}) as session: + # Enter multiple environments + for env in environments: + session.enter_environment(environment=env) + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # WHEN + session._run_task_without_session_env(step_script=script, task_parameter_values={}) + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN - All entered environment variables should be ignored + assert session.action_status == ActionStatus(state=ActionState.SUCCESS, exit_code=0) + assert "FOO=NOT_SET" in caplog.messages