1
1
from __future__ import annotations
2
2
3
+ import typing
3
4
from dataclasses import dataclass
4
5
from functools import partial
5
- from typing import Any
6
6
7
7
from django .conf import settings
8
+ from django .contrib import admin
8
9
from django .contrib import messages
9
10
from django .contrib .admin .options import BaseModelAdmin
10
11
from django .contrib .admin .templatetags .admin_urls import add_preserved_filters
11
12
from django .core .exceptions import FieldDoesNotExist
12
13
from django .http import HttpRequest
13
14
from django .http import HttpResponse
15
+ from django .http import HttpResponseBadRequest
14
16
from django .http import HttpResponseRedirect
17
+ from django .shortcuts import redirect
18
+ from django .shortcuts import render
19
+ from django .urls import path
20
+ from django .urls import reverse
21
+ from django .utils .module_loading import import_string
15
22
from django .utils .translation import gettext_lazy as _
16
23
17
24
import django_fsm as fsm
18
25
26
+ if typing .TYPE_CHECKING :
27
+ from django .forms import Form
28
+
19
29
try :
20
30
import django_fsm_log # noqa: F401
21
31
except ModuleNotFoundError :
27
37
@dataclass
28
38
class FSMObjectTransition :
29
39
fsm_field : str
30
- block_label : str
31
40
available_transitions : list [fsm .Transition ]
32
41
33
42
@@ -42,55 +51,48 @@ class FSMAdminMixin(BaseModelAdmin):
42
51
fsm_context_key = "fsm_object_transitions"
43
52
fsm_post_param = "_fsm_transition_to"
44
53
default_disallow_transition = not getattr (settings , "FSM_ADMIN_FORCE_PERMIT" , False )
54
+ fsm_transition_form_template = "django_fsm/fsm_admin_transition_form.html"
45
55
46
- def get_fsm_field_instance (self , fsm_field_name : str ) -> fsm .FSMField | None :
47
- try :
48
- return self .model ._meta .get_field (fsm_field_name )
49
- except FieldDoesNotExist :
50
- return None
56
+ def get_urls (self ):
57
+ meta = self .model ._meta
58
+ return [
59
+ path (
60
+ "<path:object_id>/transition/<str:transition_name>/" ,
61
+ self .admin_site .admin_view (self .fsm_transition_view ),
62
+ name = f"{ meta .app_label } _{ meta .model_name } _transition" ,
63
+ ),
64
+ * super ().get_urls (),
65
+ ]
66
+
67
+ def get_readonly_fields (self , request : HttpRequest , obj : typing .Any = None ) -> tuple [str ]:
68
+ """Add FSM fields to readonly fields if they are protected."""
51
69
52
- def get_readonly_fields (self , request : HttpRequest , obj : Any = None ) -> tuple [str ]:
53
70
read_only_fields = super ().get_readonly_fields (request , obj )
54
71
55
72
for fsm_field_name in self .fsm_fields :
56
73
if fsm_field_name in read_only_fields :
57
74
continue
58
- field = self .get_fsm_field_instance (fsm_field_name = fsm_field_name )
59
- if field and getattr (field , "protected" , False ):
60
- read_only_fields += (fsm_field_name ,)
75
+ try :
76
+ field = self .model ._meta .get_field (fsm_field_name )
77
+ except FieldDoesNotExist :
78
+ pass
79
+ else :
80
+ if getattr (field , "protected" , False ):
81
+ read_only_fields += (fsm_field_name ,)
61
82
62
83
return read_only_fields
63
84
64
- @staticmethod
65
- def get_fsm_block_label (fsm_field_name : str ) -> str :
66
- return f"Transition ({ fsm_field_name } )"
67
-
68
- def get_fsm_object_transitions (self , request : HttpRequest , obj : Any ) -> list [FSMObjectTransition ]:
69
- fsm_object_transitions = []
70
-
71
- for field_name in sorted (self .fsm_fields ):
72
- if func := getattr (obj , f"get_available_user_{ field_name } _transitions" ):
73
- fsm_object_transitions .append ( # noqa: PERF401
74
- FSMObjectTransition (
75
- fsm_field = field_name ,
76
- block_label = self .get_fsm_block_label (fsm_field_name = field_name ),
77
- available_transitions = [
78
- t for t in func (user = request .user ) if t .custom .get ("admin" , self .default_disallow_transition )
79
- ],
80
- )
81
- )
82
-
83
- return fsm_object_transitions
84
-
85
85
def change_view (
86
86
self ,
87
87
request : HttpRequest ,
88
88
object_id : str ,
89
89
form_url : str = "" ,
90
- extra_context : dict [str , Any ] | None = None ,
90
+ extra_context : dict [str , typing . Any ] | None = None ,
91
91
) -> HttpResponse :
92
+ """Override the change view to add FSM transitions to the context."""
93
+
92
94
_context = extra_context or {}
93
- _context [self .fsm_context_key ] = self .get_fsm_object_transitions (
95
+ _context [self .fsm_context_key ] = self ._get_fsm_object_transitions (
94
96
request = request ,
95
97
obj = self .get_object (request = request , object_id = object_id ),
96
98
)
@@ -102,24 +104,19 @@ def change_view(
102
104
extra_context = _context ,
103
105
)
104
106
105
- def get_fsm_redirect_url (self , request : HttpRequest , obj : Any ) -> str :
106
- return request .path
107
-
108
- def get_fsm_response (self , request : HttpRequest , obj : Any ) -> HttpResponse :
109
- redirect_url = self .get_fsm_redirect_url (request = request , obj = obj )
110
- redirect_url = add_preserved_filters (
111
- context = {
112
- "preserved_filters" : self .get_preserved_filters (request ),
113
- "opts" : self .model ._meta ,
114
- },
115
- url = redirect_url ,
116
- )
117
- return HttpResponseRedirect (redirect_to = redirect_url )
107
+ def _get_fsm_object_transitions (self , request : HttpRequest , obj : typing .Any ) -> list [FSMObjectTransition ]:
108
+ for field_name in sorted (self .fsm_fields ):
109
+ if func := getattr (obj , f"get_available_user_{ field_name } _transitions" ):
110
+ yield FSMObjectTransition (
111
+ fsm_field = field_name ,
112
+ available_transitions = [
113
+ t for t in func (user = request .user ) if t .custom .get ("admin" , self .default_disallow_transition )
114
+ ],
115
+ )
118
116
119
- def response_change (self , request : HttpRequest , obj : Any ) -> HttpResponse :
120
- if self . fsm_post_param in request .POST :
117
+ def response_change (self , request : HttpRequest , obj : typing . Any ) -> HttpResponse : # noqa: C901
118
+ if transition_name := request .POST . get ( self . fsm_post_param ) :
121
119
try :
122
- transition_name = request .POST [self .fsm_post_param ]
123
120
transition_func = getattr (obj , transition_name )
124
121
except AttributeError :
125
122
self .message_user (
@@ -129,9 +126,18 @@ def response_change(self, request: HttpRequest, obj: Any) -> HttpResponse:
129
126
),
130
127
level = messages .ERROR ,
131
128
)
132
- return self .get_fsm_response (
133
- request = request ,
134
- obj = obj ,
129
+ return self .get_fsm_response (request = request , obj = obj )
130
+
131
+ # NOTE: if a form is defined in the transition.custom, we redirect to the form view
132
+ if self .get_fsm_transition_custom (instance = obj , transition_func = transition_func ).get ("form" ):
133
+ return redirect (
134
+ reverse (
135
+ f"admin:{ self .model ._meta .app_label } _{ self .model ._meta .model_name } _transition" ,
136
+ kwargs = {
137
+ "object_id" : obj .pk ,
138
+ "transition_name" : transition_name ,
139
+ },
140
+ )
135
141
)
136
142
137
143
try :
@@ -173,9 +179,102 @@ def response_change(self, request: HttpRequest, obj: Any) -> HttpResponse:
173
179
level = messages .INFO ,
174
180
)
175
181
176
- return self .get_fsm_response (
177
- request = request ,
178
- obj = obj ,
179
- )
182
+ return self .get_fsm_response (request = request , obj = obj )
180
183
181
184
return super ().response_change (request = request , obj = obj )
185
+
186
+ def get_fsm_response (self , request : HttpRequest , obj : typing .Any ) -> HttpResponse :
187
+ redirect_url = add_preserved_filters (
188
+ context = {
189
+ "preserved_filters" : self .get_preserved_filters (request ),
190
+ "opts" : self .model ._meta ,
191
+ },
192
+ url = self .get_fsm_redirect_url (request = request , obj = obj ),
193
+ )
194
+ return HttpResponseRedirect (redirect_to = redirect_url )
195
+
196
+ def get_fsm_redirect_url (self , request : HttpRequest , obj : typing .Any ) -> str :
197
+ return request .path
198
+
199
+ def get_fsm_transition_custom (self , instance , transition_func ):
200
+ """Helper function to get custom attributes for the current transition"""
201
+ return getattr (self .get_fsm_transition (instance , transition_func ), "custom" , {})
202
+
203
+ def get_fsm_transition (self , instance , transition_func ) -> fsm .Transition | None :
204
+ """
205
+ Extract custom attributes from a transition function for the current state.
206
+ """
207
+ if not hasattr (transition_func , "_django_fsm" ):
208
+ return None
209
+
210
+ fsm_meta = transition_func ._django_fsm
211
+ current_state = fsm_meta .field .get_state (instance )
212
+ return fsm_meta .get_transition (current_state )
213
+
214
+ def get_fsm_transition_form (self , transition : fsm .Transition ) -> Form | None :
215
+ form = transition .custom .get ("form" )
216
+ if isinstance (form , str ):
217
+ form = import_string (form )
218
+ return form
219
+
220
+ def fsm_transition_view (self , request , * args , ** kwargs ):
221
+ transition_name = kwargs ["transition_name" ]
222
+ obj = self .get_object (request , kwargs ["object_id" ])
223
+
224
+ transition_method = getattr (obj , transition_name )
225
+ if not hasattr (transition_method , "_django_fsm" ):
226
+ return HttpResponseBadRequest (f"{ transition_name } is not a transition method" )
227
+
228
+ transitions = transition_method ._django_fsm .transitions
229
+ if isinstance (transitions , dict ):
230
+ transitions = list (transitions .values ())
231
+ transition = transitions [0 ]
232
+
233
+ if TransitionForm := self .get_fsm_transition_form (transition ):
234
+ if request .method == "POST" :
235
+ transition_form = TransitionForm (data = request .POST , instance = obj )
236
+ if transition_form .is_valid ():
237
+ transition_method (** transition_form .cleaned_data )
238
+ obj .save ()
239
+ else :
240
+ return render (
241
+ request ,
242
+ self .fsm_transition_form_template ,
243
+ context = admin .site .each_context (request )
244
+ | {
245
+ "opts" : self .model ._meta ,
246
+ "original" : obj ,
247
+ "transition" : transition ,
248
+ "transition_form" : transition_form ,
249
+ },
250
+ )
251
+ else :
252
+ transition_form = TransitionForm (instance = obj )
253
+ return render (
254
+ request ,
255
+ self .fsm_transition_form_template ,
256
+ context = admin .site .each_context (request )
257
+ | {
258
+ "opts" : self .model ._meta ,
259
+ "original" : obj ,
260
+ "transition" : transition ,
261
+ "transition_form" : transition_form ,
262
+ },
263
+ )
264
+ else :
265
+ try :
266
+ transition_method ()
267
+ except fsm .TransitionNotAllowed :
268
+ self .message_user (
269
+ request ,
270
+ self .fsm_transition_not_allowed_msg .format (transition_name = transition_name ),
271
+ messages .ERROR ,
272
+ )
273
+ else :
274
+ obj .save ()
275
+ self .message_user (
276
+ request ,
277
+ self .fsm_transition_success_msg .format (transition_name = transition_name ),
278
+ messages .SUCCESS ,
279
+ )
280
+ return redirect (f"admin:{ self .model ._meta .app_label } _{ self .model ._meta .model_name } _change" , object_id = obj .id )
0 commit comments