Skip to content

gh-136190: dis.py: No line number in case of an artificial bytecode instruction #136923

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion Lib/dis.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,15 +768,27 @@ def _get_instructions_bytes(code, linestarts=None, line_offset=0, co_positions=N
starts_line = False
local_line_number = None
line_number = None
last_line_number = None
for offset, start_offset, op, arg in _unpack_opargs(original_code):
if linestarts is not None:
starts_line = offset in linestarts
starts_line = False
if offset in linestarts:
# if linestarts[offset] is None, we don't start a new line
# (JUMP_FORWARD, etc.)
# same if last_line_number equals linestarts[offset]
if linestarts[offset]:
if last_line_number and linestarts[offset] == last_line_number:
starts_line = False
else:
starts_line = True
if starts_line:
local_line_number = linestarts[offset]
if local_line_number is not None:
line_number = local_line_number + line_offset
else:
line_number = None
if line_number is not None:
last_line_number = line_number
positions = Positions(*next(co_positions, ()))
deop = _deoptop(op)
op = code[offset]
Expand Down
33 changes: 33 additions & 0 deletions test_bugs_sprint_ep2025/inline_bug_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from dis import dis, Bytecode
import inspect

source = """
def func():
x, y = 0, 1
z = (x or 1) if y else 1
print(z)
"""

# source = """
# def func():
# z = 0.1
# if z:
# x, y = 0, 1
# else:
# x, y = 1, 0
# print(x, y)
# """

func = compile(source, "inline_bug_report.py", "exec", optimize=2)

print(dis(func))

# for name, value in inspect.getmembers(func.__code__):
# print(name, value)

# print("-- lines are --")
# lines = [line for line in func.__code__.co_lines()]
# print(lines)

# for code in Bytecode(func):
# print(f"{code.line_number} {code.opcode:06x} {code.opname}")
138 changes: 138 additions & 0 deletions test_bugs_sprint_ep2025/race_condition_linux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# N.B.: We apply the monkeypatch before subprocess is imported because subprocess will
# hold strong references to os.waitpid.
from __future__ import annotations

import os
import sys
import textwrap
import traceback
from functools import wraps

orig_waitpid = os.waitpid
orig_kill = os.kill
freed_pids = set[int]()


@wraps(orig_waitpid)
def waitpid(pid: int, options: int, /) -> tuple[int, int]:
print(f"--DBG: start waitpid({pid!r}, {options!r}) @")
print(
textwrap.indent(
"".join(traceback.extract_stack(sys._getframe(1), limit=2).format()),
prefix=" " * (-2 + len("--DBG: ")),
),
end="",
)
try:
res = orig_waitpid(pid, options)
except BaseException as exc:
print(f"--DBG: finish waitpid({pid!r}, {options!r}) -> {exc!r}")
raise
else:
res_pid, status = res
if res_pid != 0:
freed_pids.add(res_pid)
print(f"--DBG: finish waitpid({pid!r}, {options!r}) = {res!r}")
return res


@wraps(orig_kill)
def kill(pid: int, sig: int, /) -> None:
print(f"--DBG: kill({pid}, {sig})")
if pid in freed_pids:
raise ValueError(
"caller is trying to signal an already-freed PID! did a site call waitpid without telling the sites with references to that PID about it?"
)
return orig_kill(pid, sig)


os.waitpid = waitpid
os.kill = kill

assert "subprocess" not in sys.modules

import asyncio
import subprocess
from signal import Signals as Signal
from typing import Literal
from typing import assert_never


async def main() -> None:
_watcher_case: Literal["_PidfdChildWatcher", "_ThreadedChildWatcher"]
if sys.version_info >= (3, 14):
_watcher = asyncio.get_running_loop()._watcher # type: ignore[attr-defined]
if isinstance(_watcher, asyncio.unix_events._PidfdChildWatcher): # type: ignore[attr-defined]
_watcher_case = "_PidfdChildWatcher"
elif isinstance(_watcher, asyncio.unix_events._ThreadedChildWatcher): # type: ignore[attr-defined]
_watcher_case = "_ThreadedChildWatcher"
else:
raise NotImplementedError()
else:
_watcher = asyncio.get_child_watcher()
if isinstance(_watcher, asyncio.PidfdChildWatcher):
_watcher_case = "_PidfdChildWatcher"
elif isinstance(_watcher, asyncio.ThreadedChildWatcher):
_watcher_case = "_ThreadedChildWatcher"
else:
raise NotImplementedError()
print(f"{_watcher_case = !r}")

process = await asyncio.create_subprocess_exec(
"python",
"-c",
"import time; time.sleep(1)",
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
print(f"{process.pid = !r}")

process.send_signal(Signal.SIGKILL)

# This snippet is contrived, in order to make this snippet hit the race condition
# consistently for reproduction & testing purposes.
if _watcher_case == "_PidfdChildWatcher":
os.waitid(os.P_PID, process.pid, os.WEXITED | os.WNOWAIT)
# Or alternatively, time.sleep(0.1).

# On the next loop cycle asyncio will select on the pidfd and append the reader
# callback:
await asyncio.sleep(0)
# On the next loop cycle the reader callback will run, calling (a) waitpid
# (freeing the PID) and (b) call_soon_threadsafe(transport._process_exited):
await asyncio.sleep(0)

# The _PidfdChildWatcher has now freed the PID but hasn't yet told the
# asyncio.subprocess.Process or the subprocess.Popen about this
# (call_soon_threadsafe).
elif _watcher_case == "_ThreadedChildWatcher":
if (thread := _watcher._threads.get(process.pid)) is not None: # type: ignore[attr-defined]
thread.join()
# Or alternatively, time.sleep(0.1).

# The _ThreadedChildWatcher has now freed the PID but hasn't yet told the
# asyncio.subprocess.Process or the subprocess.Popen about this
# (call_soon_threadsafe).
else:
assert_never(_watcher_case)

# The watcher has now freed the PID but hasn't yet told the
# asyncio.subprocess.Process or the subprocess.Popen that the PID they hold a
# reference to has been freed externally!
#
# I think these two things need to happen atomically.

try:
process.send_signal(Signal.SIGKILL)
except ProcessLookupError:
pass


# Pretend we don't have pidfd support
# if sys.version_info >= (3, 14):
# asyncio.unix_events.can_use_pidfd = lambda: False # type: ignore[attr-defined]
# else:
# asyncio.set_child_watcher(asyncio.ThreadedChildWatcher())

asyncio.run(main())
Empty file.
49 changes: 49 additions & 0 deletions test_bugs_sprint_ep2025/understand_shutil.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import subprocess
from subprocess import Popen
import shlex

print("""\ncalling 'shlex.quote("for")'""")
subprocess.call(shlex.quote("for"), shell=True)

print("""\ncalling 'shlex.quote("'for'")'""")
subprocess.call(shlex.quote("'for'"), shell=True)

print("""\ncalling "'for'" """)
subprocess.call("'for'", shell=True, env={'PATH': '.'})

print("""\ncalling "for" """)
subprocess.call("for", shell=True, env={'PATH': '.'})

# import os, shlex, shutil, subprocess
# open("do", "w").write("#!/bin/sh\necho Something is being done...")

# os.chmod("do", 0o700)

# subprocess.call(shlex.quote("'./my_command'"), shell=True)
# subprocess.call("'my_command'", shell=True, env={'PATH': '.'})
# subprocess.run(shlex.quote("do"), shell=True, env={'PATH': '.'})

# print(shlex.quote("my_command"))

2
# p = Popen(shlex.split("mycommand"), shell=False, executable="/bin/bash")
# print(p)

# test = shlex.quote("done")
# print(test)

# class MyError(Exception):
# def __init__(self):
# print("Hello")


# class SomeProcessError(MyError):
# def __init__(self, returncode):
# self.returncode = returncode

# def __str__(self):
# return f"Died with returncode: {self.returncode}"

# raise SomeProcessError(3)


Loading