-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsarsa_lambda.py
More file actions
65 lines (55 loc) · 1.96 KB
/
sarsa_lambda.py
File metadata and controls
65 lines (55 loc) · 1.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from __future__ import annotations
from typing import Optional
import numpy as np
def sarsa_lambda_control(
env,
gamma: float = 0.99,
alpha: float = 0.1,
lam: float = 0.9,
epsilon: float = 0.1,
episodes: int = 1000,
n_states: Optional[int] = None,
n_actions: Optional[int] = None,
trace_type: str = 'accumulating',
seed: Optional[int] = None,
) -> np.ndarray:
'''
On-policy SARSA(lambda) with eligibility traces (tabular Q).
'''
rng = np.random.default_rng(seed)
if n_states is None:
n_states = getattr(env, 'n_states', getattr(env, 'nS', None))
if n_states is None:
raise ValueError('Provide n_states or ensure env has n_states/nS.')
if n_actions is None:
n_actions = getattr(env, 'n_actions', getattr(env, 'nA', None))
if n_actions is None:
raise ValueError('Provide n_actions or ensure env has n_actions/nA.')
Q = np.zeros((n_states, n_actions), dtype=float)
def eps_greedy(s: int) -> int:
if rng.random() < epsilon:
return int(rng.integers(n_actions))
return int(np.argmax(Q[s]))
for _ in range(episodes):
E = np.zeros_like(Q) # eligibility for (s,a)
s = env.reset()
a = eps_greedy(s)
while True:
s_next, r, done, *_ = env.step(a)
a_next = 0 if done else eps_greedy(s_next)
q_next = 0.0 if done else Q[s_next, a_next]
delta = r + gamma * q_next - Q[s, a]
# decay all, then reinforce current pair
E *= (gamma * lam)
if trace_type == 'replacing':
E[s, :] = 0.0
E[s, a] = 1.0
elif trace_type == 'accumulating':
E[s, a] += 1.0
else:
raise ValueError("trace_type must be 'accumulating' or 'replacing'.")
Q += alpha * delta * E
s, a = s_next, a_next
if done:
break
return Q