forked from MAPIRlab/rlrobot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodel.py
More file actions
129 lines (112 loc) · 4.19 KB
/
model.py
File metadata and controls
129 lines (112 loc) · 4.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# -*- coding: utf-8 -*-
# +-----------------------------------------------+
# | RL-ROBOT. Reinforcement Learning for Robotics |
# | Angel Martinez-Tenor |
# | MAPIR. University of Malaga. 2016 |
# +-----------------------------------------------+
""" Simulation from Markovian model """
import os.path
import random
import numpy as np
import task
# transition matrix of the model:
t = np.empty(0)
# Reward matrix of the model:
r = np.empty(0)
# Initial State (can be obtained directly from a 'SASR_step' datafile):
s0 = -1
freq_t = np.empty(0)
freq_r = np.empty(0)
def generate_t_and_r(datafile_model, n_episodes_model=1):
""" generate Transition and Reward functions from a 'SASR_step' datafile """
global t, r, s0, freq_t, freq_r
t = np.zeros((task.n_states, task.n_actions, task.n_states),
dtype=np.float16)
r = np.zeros((task.n_states, task.n_actions, task.n_states,
task.REWARDS.size), dtype=np.float16)
freq_t = np.zeros((task.n_states, task.n_actions, task.n_states),
dtype=np.uint32)
freq_r = np.zeros((task.n_states, task.n_actions, task.n_states,
task.REWARDS.size), dtype=np.uint32)
print("Generating T and R. Please wait ...")
for epi in range(n_episodes_model):
filename = datafile_model + "_SASR_step"
if n_episodes_model > 1:
filename = datafile_model + "_ep_" + str(epi) + "_SASR_step"
try:
data = np.load(filename)
except IOError:
import sys
sys.exit("Error: " + filename + " not found")
s0 = int(data[0, 0])
for step in range(np.size(data, 0)):
s = int(data[step, 0])
a = int(data[step, 1])
sp = int(data[step, 2])
rew = data[step, 3]
ty_re = np.where(task.REWARDS == rew)[0][0]
freq_t[s, a, sp] += 1
freq_r[s, a, sp, ty_re] += 1
# normalize
for s in range(task.n_states):
if s % 100.0 == 0:
print("state ", str(s), " of ", str(task.n_states))
for a in range(task.n_actions):
partial_sum_t = np.sum(freq_t[s, a, :]) # np.sum(freq_t, 2)
for sp in range(task.n_states):
if partial_sum_t == 0:
t[s, a, sp] = 1.0 / task.n_states
else:
t[s, a, sp] = (freq_t[s, a, sp] / partial_sum_t)
# Reward function:
partial_sum_r = np.sum(freq_r[s, a, sp, :]) # np.sum(Freq_R,3)
for ty_re in range(task.REWARDS.size):
if partial_sum_r == 0:
r[s, a, sp, ty_re] = 1.0 / task.REWARDS.size
else:
r[s, a, sp, ty_re] = (freq_r[s, a, sp, ty_re] /
partial_sum_r)
return
def get_sp(s, a):
""" return reached state from model """
sp = -1
# random.seed()
rd = random.random()
accum = 0
for i in range(task.n_states):
accum = accum + t[s, a, i]
if rd < accum:
sp = i
break
if sp == -1:
print("\n Warning: Model lacks data for T in state: %d" % s + "\n")
return sp
def get_r(s, a, sp):
""" return obtained reward from model """
reward = 0
# random.seed()
rd = random.random()
accum = 0
for i in range(task.REWARDS.size):
accum = accum + r[s, a, sp, i]
if rd < accum:
reward = task.REWARDS[i]
break
return reward
def load(filename, n_episodes_model=1):
""" Load model (T,R) from <filename>_model.npz. Update t, r, s0
if no model is available, generate and save from SASR_step file """
global t, r, s0
file_model = filename + ".npz"
if os.path.isfile(file_model):
print("Model file found")
with np.load(file_model) as fm:
t = fm['T']
r = fm['R']
s0 = fm['s0']
else:
print("Model file not found")
generate_t_and_r(filename, n_episodes_model) # create t, r, s0
""" Save model (T,R) to <filename>_model.npz """
np.savez_compressed(file_model, T=t, R=r, s0=s0)
return