Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce update_aliases #1780

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open

Introduce update_aliases #1780

wants to merge 16 commits into from

Conversation

beverlylytle
Copy link
Collaborator

@beverlylytle beverlylytle commented Feb 18, 2025

In an abstract sense, Thunder views pytorch programs as a directed acyclic graph of nodes representing elemental functions with edges between those nodes representing composition of functions. Execution of the program is then can be viewed as the waterfall execution of the nodes. If it is assumed that the execution of each node has no side-effects, then executing the nodes in the order coming from any topological sorting will be valid. However, in-place operations do have side-effects, and reordering the execution of such nodes may yield incorrect results. These side-effects may be propagated further than expected by views of tensors. This PR introduces a new elemental function update_aliases which introduces new edges into the DAG. When inserted judiciously, these new edges will prevent the reordering of the execution of nodes involving tensors which have been affected by in-place operations. The in-place operations are left... in place. Because of this and nvfuser's current approach to in-place operations, new fusion breaks are introduced.

Currently, this change is hidden behind the compile option flag skip_inplace_alias_updates which defaults to True. It has only been tested with skip_inplace_functionalization=True as the resolution of the two approaches has not been fleshed out. Only the forward execution is tested here, but I have hopes (and some experimental evidence) that the backward execution largely works as well.

@@ -2299,6 +2312,19 @@ def iter_bound_symbols(bound_symbols):
yield symbol


def get_first_proxy(proxies) -> Proxy | None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we need to merge #1755 first. I'll check if it's easy to pull it out from the current PR stack.

@@ -102,6 +102,12 @@ def can_execute(self, bsym: BoundSymbol) -> bool:
def can_fuse(self, bsym: BoundSymbol) -> bool:
sym = bsym.sym

# !!! this is probably horribly wrong, please help
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One example where we need to fuse copies is efficient batch_norm

def test_batch_norm(benchmark, executor: Callable, compute_type: ComputeType):

What problems did you see?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the nearly trivial

def foo(x, y):
   z = x + y
   z.add_(1)
   return z

If I restore the fusibility of copy_, then I encounter the following:

An error occurred while defining nvFuser FusionDefinition None.
If you believe this is a bug or need assistance, please file an issue at https://github.com/NVIDIA/Fuser/issues/new
Here's a script to reproduce the error:
```python
# CUDA devices:
#  0: NVIDIA RTX 6000 Ada Generation
# torch version: 2.5.1+cu124
# cuda version: 12.4
# nvfuser version: 0.2.23+gitd53be45
import torch
from nvfuser import FusionDefinition, DataType

def nvfuser_incomplete_fusion(fd : FusionDefinition) -> None :
    T0 = fd.define_tensor(shape=[4, 10], contiguity=[True, True], dtype=DataType.Float, is_cpu=False, stride_order=[1, 0])
    T1 = fd.define_tensor(shape=[4, 10], contiguity=[True, True], dtype=DataType.Float, is_cpu=False, stride_order=[1, 0])
    T2 = fd.ops.add(T0, T1)
    T3 = fd.ops.set(T2)
    S4 = fd.define_scalar(1.00000, dtype=DataType.Double)
    T5 = fd.ops.add(T3, S4)
    T6 = fd.ops.set(T5)
    fd.add_output(T6, T3)
    fd.add_output(T3)

with FusionDefinition() as fd:
    nvfuser_fusion_idNone(fd)

Traceback (most recent call last):
  File "/home/blytle/miniforge3/envs/thdrs/lib/python3.10/site-packages/nvfuser/__init__.py", line 105, in __exit__
    self._finalize_definition()
RuntimeError:  INTERNAL ASSERT FAILED at "/workspace/Fuser/csrc/python_frontend/fusion_state.cpp":141, please report a bug with repro script to NVFuser at https://github.com/NVIDIA/Fuser/issues. 
Detected exception while building Fusion Ir. The failing RecordFunctor is: fd.add_output(T6, T3)
NvFuser error message is:  INTERNAL ASSERT FAILED at "/workspace/Fuser/csrc/fusion.cpp":784, please report a bug with repro script to NVFuser at https://github.com/NVIDIA/Fuser/issues. alias source can only be a fusion input

Outside of this function, it's invisible whether the add is in-place or not. And that's how Thunder handles it currently, the in-place-ness disappears:

# CUDA devices:
#  0: NVIDIA RTX 6000 Ada Generation
# torch version: 2.5.1+cu124
# cuda version: 12.4
# nvfuser version: 0.2.23+gitd53be45
import torch
from nvfuser import FusionDefinition, DataType

def nvfuser_fusion_id0(fd : FusionDefinition) -> None :
    T0 = fd.define_tensor(shape=[4, 10], contiguity=[True, True], dtype=DataType.Float, is_cpu=False, stride_order=[1, 0])
    T1 = fd.define_tensor(shape=[4, 10], contiguity=[True, True], dtype=DataType.Float, is_cpu=False, stride_order=[1, 0])
    T2 = fd.ops.add(T0, T1)
    S3 = fd.define_scalar(1.00000, dtype=DataType.Double)
    T4 = fd.ops.add(T2, S3)
    fd.add_output(T4)

with FusionDefinition() as fd:
    nvfuser_fusion_id0(fd)

So the approach of "preserving the in-place operations" doesn't work without some breaking up of the fusion regions. Breaking at every instance of copy_ is too severe though. If there is a chain of in-place operations (eg x.add(1).mul_(5)), it would be terribly inefficient and this where the existing in-place functionalization excels.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the even simpler example

def foo(x):
   x.add_(1)
   return x

transforming this to

def foo(x):
    x_alias = update_aliases((x,))
    x_alias.add_(1)
    return x_alias

is already problematic with nvfuser without introducing more fusion breaks because x_alias is not input to the fusion.:

def nvfuser_incomplete_fusion(fd : FusionDefinition) -> None :
    T0 = fd.define_tensor(shape=[4, 10], contiguity=[True, True], dtype=DataType.Float, is_cpu=False, stride_order=[1, 0])
    T1 = fd.ops.set(T0)
    S2 = fd.define_scalar(1.00000, dtype=DataType.Double)
    T3 = fd.ops.add(T1, S2)
    T4 = fd.ops.set(T3)
    fd.add_output(T4, T1)
    fd.add_output(T1)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current behavior as of 1fa6baf is to exclude update_aliases from fused nvFuser region and then the output of update_aliases becomes the input to the fused region with nvFuser working for this small example:

In [1]: import torch

In [2]: import thunder

In [3]: def foo(x):
   ...:    x.add_(1)
   ...:    return x
   ...: 

In [4]: jfoo = thunder.jit(foo, fusion_type="dataflow", skip_inplace_alias_updates=False, skip_inplace_functionalization=True)

In [5]: x = torch.randn(3, 3, device="cuda")

In [6]: jfoo(x);

In [7]: jfoo._lc_cs.last_traces[-1]
Out[7]: 
# Constructed by Unwrap the actual return value
import torch
from thunder.executors.torchex import no_autocast

@torch.no_grad()
@no_autocast
def computation(x):
  # x: "cuda:0 f32[3, 3]"
  (t4,) = update_aliases((x,))
  del x
  [t1] = nvFusion0(t4)
    # t0 = prims.add(t4, 1.0)  # t0: "cuda:0 f32[3, 3]"
    # t1 = prims.copy_(t0, t4, grad_enabled=True)  # t1: "cuda:0 f32[3, 3]"
  del t4
  return (t1,)

Copy link
Collaborator

@crcrpar crcrpar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most comments are questions to help me understand the PR, itself. Generally looks nice :)

dtypes=NOTHING,
)
def test_inplace_on_view(executor, device, dtype):
def f(x, _):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's interesting that test_inplace_on_view_torch_cuda_None passes while test_inplace_on_view_nvfuser_cuda_None doesn't when the execution function for the failing test case doesn't have a nvFuser region:

def computation(x):
  # x: "cuda:0 f32[2, 3]"
  (t18,) = update_aliases((x,))
  del x
  t25 = Tensor.view(t18, (3, 2))  # t25: "cuda:0 f32[3, 2]"
  (t19, t20) = update_aliases((t18, t25))
  del t18, t25
  t27 = Tensor.__getitem__(t20, 0)  # t27: "cuda:0 f32[2]"
  (t21, t22, _) = update_aliases((t19, t27, t20))
  del t19, t27, t20
  t24 = copy_with_setitem_impl(t22, 0, 0)  # t24: "cuda:0 f32[2]"
  del t24, t22
  return (t21,)

and here's the execution trace for the working case:

def computation(x):
  # x: "cuda:0 f32[2, 3]"
  (t22,) = update_aliases((x,))
  del x

  t23 = Tensor.view(t22, (3, 2))  # t23: "cuda:0 f32[3, 2]"
  (t24, t25) = update_aliases((t22, t23))
  del t22, t23

  t27 = Tensor.__getitem__(t25, 0)  # t27: "cuda:0 f32[2]"
  (t28, t29, _) = update_aliases((t24, t27, t25))
  del t24, t27, t25

  t31 = copy_with_setitem_impl(t29, 0, 0)  # t31: "cuda:0 f32[2]"
  copy_(t31, t29, grad_enabled=True)
  del t31, t29
  return (t28,)

The traces are not the same.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I think what's happening here is that nvfuser's executor runs a dce pass over the bsyms. The logic behind dce is focused on output of bsyms. You'll notice that in the working case the last interesting line is copy_(t31, t29, grad_enabled=True) which is absent from the non-working case. That line produces no output, and hence, is removed by nvfuser's fusion pass. I believe this lack of output for copy_ is coming from the lack of output in setitem_

)
def test_aliased_input(executor, device, dtype):
def f(x, y, z):
return y.exp_().add(x) + z.exp() # Fails on CUDA because operations have been reordered.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the execution trace:

def computation(x, y, z):
  # x: "cuda:0 f32[2, 1, 2]"
  # y: "cuda:0 f32[2, 1, 2]"
  # z: "cuda:0 f32[1, 2, 2]"
  (t9,) = update_aliases((y,))
  del y
  [t1, t6] = nvFusion0(t9, x, z)
    # t0 = prims.exp(t9)  # t0: "cuda:0 f32[2, 1, 2]"
    # t1 = prims.copy_(t0, t9, grad_enabled=True)  # t1: "cuda:0 f32[2, 1, 2]"
    # t2 = prims.add(t1, x)  # t2: "cuda:0 f32[2, 1, 2]"
    # t3 = prims.exp(z)  # t3: "cuda:0 f32[1, 2, 2]"
    # t4 = prims.broadcast_in_dim(t2, (2, 2, 2), (0, 1, 2))  # t4: "cuda:0 f32[2, 2, 2]"
    # t5 = prims.broadcast_in_dim(t3, (2, 2, 2), (0, 1, 2))  # t5: "cuda:0 f32[2, 2, 2]"
    # t6 = prims.add(t4, t5)  # t6: "cuda:0 f32[2, 2, 2]"
  del t9
  return (t6,)

The program seems correct. Let's check out the nvFuser definition (jfn._lc_cs.last_traces[-1].python_ctx()["nvFusion0"].last_used):

def nvfuser_fusion_id0(fd : FusionDefinition) -> None :
    T0 = fd.define_tensor(shape=[2, 1, 2], contiguity=[True, None, True], dtype=DataType.Float, is_cpu=False, stride_order=[2, 1, 0])
    T1 = fd.define_tensor(shape=[2, 1, 2], contiguity=[True, None, True], dtype=DataType.Float, is_cpu=False, stride_order=[2, 1, 0])
    T2 = fd.define_tensor(shape=[1, 2, 2], contiguity=[None, True, True], dtype=DataType.Float, is_cpu=False, stride_order=[2, 1, 0])
    T3 = fd.ops.exp(T0)
    T4 = fd.ops.set(T3)
    fd.add_output(T4, T0)
    T5 = fd.ops.add(T0, T1)
    T6 = fd.ops.exp(T2)
    T11 = fd.ops.broadcast_in_dim(T5, shape=[2, 2, 2], broadcast_dims=[0, 1, 2])
    T16 = fd.ops.broadcast_in_dim(T6, shape=[2, 2, 2], broadcast_dims=[0, 1, 2])
    T17 = fd.ops.add(T11, T16)
    fd.add_output(T0)
    fd.add_output(T17)

Could it be due to fd.add_output(T0)? Is it an nvFuser bug or a Thunder-translation-to-nvFuser bug?

Script to reproduce the error using the generated fusion:

import torch
from torch.testing import make_tensor
from nvfuser import FusionDefinition, DataType

def nvfuser_fusion_id0(fd : FusionDefinition) -> None :
    T0 = fd.define_tensor(shape=[2, 1, 2], contiguity=[True, None, True], dtype=DataType.Float, is_cpu=False, stride_order=[2, 1, 0])
    T1 = fd.define_tensor(shape=[2, 1, 2], contiguity=[True, None, True], dtype=DataType.Float, is_cpu=False, stride_order=[2, 1, 0])
    T2 = fd.define_tensor(shape=[1, 2, 2], contiguity=[None, True, True], dtype=DataType.Float, is_cpu=False, stride_order=[2, 1, 0])
    T3 = fd.ops.exp(T0)
    T4 = fd.ops.set(T3)
    fd.add_output(T4, T0)
    T5 = fd.ops.add(T0, T1)
    T6 = fd.ops.exp(T2)
    T11 = fd.ops.broadcast_in_dim(T5, shape=[2, 2, 2], broadcast_dims=[0, 1, 2])
    T16 = fd.ops.broadcast_in_dim(T6, shape=[2, 2, 2], broadcast_dims=[0, 1, 2])
    T17 = fd.ops.add(T11, T16)
    fd.add_output(T0)
    fd.add_output(T17)

with FusionDefinition() as fd:
    nvfuser_fusion_id0(fd)

def f(x, y, z):
    return y, y.exp_().add(x) + z.exp()

a = make_tensor((2, 1, 2), dtype=torch.float32, device="cuda:0")
b = a.clone()
c = a.view(1, 2, 2)
a_ = a.clone().detach()
b_ = b.clone().detach()
c_ = c.clone().detach()
actual = fd.execute([a, b, c])
expected = f(a_, b_, c_)
torch.testing.assert_close(actual, expected)
torch.testing.assert_close(a, a_)
torch.testing.assert_close(b, b_)
torch.testing.assert_close(c, c_)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a line in the nvFuser definition T5 = fd.ops.add(T0, T1) which should be T5 = fd.ops.add(T4, T1) or T5 = fd.ops.add(T3, T1). It's happening because of this line:


In order to generate nvFuser fusion definition with T3 or T4 in the add arguments we need to return either nvcopy_from or alias_output. With both options the test passes. I push the commit with alias_output return (ba264c3).

)
def test_inplace_on_chunk(executor, device, dtype):
def f(x):
y, z = x.chunk(2, dim=1) # Fails on CUDA with stride issues
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beverlylytle beverlylytle changed the title [WIP] Update aliases Introduce update_aliases Feb 26, 2025
@beverlylytle beverlylytle marked this pull request as ready for review February 26, 2025 11:58
@@ -308,8 +308,9 @@ def find_cut(
required_consumer_symbols = tuple(
utils.find_producer_symbols(consumer_trace, consumer.output, external_consumer_inputs)
)
# !!! Filtering out the Nones coming from update_aliases + CDE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a typo?

Suggested change
# !!! Filtering out the Nones coming from update_aliases + CDE
# !!! Filtering out the Nones coming from update_aliases + DCE

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes, thanks.

Comment on lines +33 to +36
def _is_view_creation_op(bsym):
import thunder.torch as ltorch

return bsym.sym in ltorch._syms_returning_views or bsym.sym in ltorch._syms_that_may_return_views
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we happen to have corner cases to check behavior/outputs of insert_alias_updates when _syms_that_may_return_views ops return a new tensor not a view?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps eventually. The worry for calling update_aliases too frequently would be that bsyms that could have otherwise been reordered no longer can, potentially resulting in slower performance. The added complexity of dealing with these corner cases needs to be weighed against this as of yet unknown performance cost.

Comment on lines +75 to +77
in_tensors = map(variableify, filter(lambda p: isinstance(p, TensorProxy), bsym.flat_proxy_args))
if _is_inplace_op(bsym):
in_tensor = list(in_tensors)[0]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if bsym.flat_proxy_args is empty?

Suggested change
in_tensors = map(variableify, filter(lambda p: isinstance(p, TensorProxy), bsym.flat_proxy_args))
if _is_inplace_op(bsym):
in_tensor = list(in_tensors)[0]
in_tensors = list(map(variableify, filter(lambda p: isinstance(p, TensorProxy), bsym.flat_proxy_args)))
if _is_inplace_op(bsym) and in_tensors:
in_tensor = list(in_tensors)[0]

out_tensors = set(map(variableify, filter(lambda p: isinstance(p, TensorProxy), bsym.flat_proxy_outs)))
encountered.update(in_tensors)
group = set(reduce(set.union, filter(lambda g: any(g.intersection(in_tensors)), view_groups), set()))
if len(group) == 0:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PEP8: "For sequences, (strings, lists, tuples), use the fact that empty sequences are false".

Suggested change
if len(group) == 0:
if not group:

encountered.update(out_tensors)
new_bsym = bsym.from_bsym_swap_proxies(swap_map)
bsyms.append(new_bsym)
if _is_inplace_op(bsym) and len(out_tensors) == 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combined with the previous suggestion about bsym.flat_proxy_args.

Suggested change
if _is_inplace_op(bsym) and len(out_tensors) == 1:
if _is_inplace_op(bsym) and len(out_tensors) == 1 and in_tensors:

# This is a view creation with operands that are not involved in any inplace ops.
bsyms.append(bsym.from_bsym_swap_proxies(swap_map, skip_output=True))
continue
views_encountered = group.intersection(encountered)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we take care of the case when views_encountered is empty?

Suggested change
views_encountered = group.intersection(encountered)
views_encountered = group.intersection(encountered)
if not views_encountered:
bsyms.append(bsym.from_bsym_swap_proxies(swap_map, skip_output=True))
continue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants