forked from Skylark0924/Rofunc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tpgmm.py
740 lines (619 loc) · 36.3 KB
/
tpgmm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
# Copyright 2023, Junjia LIU, [email protected]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union, List, Tuple
import matplotlib.pyplot as plt
import numpy as np
import scipy
from numpy import ndarray
import rofunc as rf
from rofunc.learning.ml.gmm import GMM
from rofunc.learning.ml.hmm import HMM
from rofunc.utils.logger.beauty_logger import beauty_print
class TPGMM:
def __init__(self, demos_x: Union[List, np.ndarray], task_params: dict, nb_states: int = 4, nb_frames: int = None,
reg: float = 1e-3, plot: bool = False, save: bool = False, save_params: dict = None):
"""
Task-parameterized Gaussian Mixture Model (TP-GMM)
:param demos_x: demo displacement
:param task_params: task parameters, require {'frame_origins': [start_xdx, end_xdx]}
:param nb_states: number of states in the HMM
:param nb_frames: number of candidate frames in a task-parameterized mixture
:param reg: regularization coefficient
:param plot: whether to plot the result
:param save: whether to save the result
:param save_params: save parameters, {'save_dir': 'path/to/save', 'save_format': 'eps'}
"""
self.demos_x = demos_x
self.nb_states = nb_states
self.reg = reg
self.horizon = len(demos_x[0])
self.plot = plot
self.save = save
self.save_params = save_params
if self.save:
if 'save_dir' not in self.save_params or 'format' not in self.save_params:
raise ValueError('Please specify the save dir and format!')
beauty_print(
'Save dir: {}, format: {}'.format(self.save_params['save_dir'], self.save_params['format']),
type='info')
self.nb_dim = len(self.demos_x[0][0])
self.nb_deriv = 2 # TODO: for now it is just original state and its first derivative
self.task_params = task_params
self.nb_frames = len(task_params['frame_origins']) if nb_frames is None else nb_frames
"""
Some related matrices are generated from the demo data with displacement
M: Number of demonstrated trajectories in a training set (m will be used as index)
T: Number of datapoints in a trajectory (t will be used as index)
P: Number of candidate frames in a task-parameterized mixture (j will be used as index/exponent)
nb_dim: Dimension of the demo state
nb_deriv: Number of derivatives of the demo state
demos_xdx: concat original states with their first derivative, [M, T, nb_dim * nb_deriv]
demos_A: the orientation of the p-th candidate coordinate system for this demonstration, [M, T, P, nb_dim, nb_dim]
demos_b: the position of the p-th candidate coordinate system for this demonstration, [M, T, P, nb_dim]
demos_A_xdx: augment demos_A to original states and their first derivative, [M, T, P, nb_dim * nb_deriv, nb_dim * nb_deriv]
demos_b_xdx: augment demos_b to original states and their first derivative, [M, T, P, nb_dim * nb_deriv]
demos_xdx_f: states and their first derivative in P frames, [M, T, P, nb_dim * nb_deriv]
demos_xdx_augm: reshape demos_xdx_f, [M, T, nb_dim * nb_deriv * P]
"""
self.demos_dx, self.demos_A, self.demos_b, self.demos_A_xdx, self.demos_b_xdx, self.demos_xdx, \
self.demos_xdx_f, self.demos_xdx_augm = self.get_related_matrix()
def get_dx(self):
demos_dx = []
for i in range(len(self.demos_x)):
demo_dx = []
for j in range(len(self.demos_x[i])):
if 0 < j < len(self.demos_x[i]) - 1:
dx = (self.demos_x[i][j + 1] - self.demos_x[i][j - 1]) / 2
elif j == len(self.demos_x[i]) - 1: # final state
# dx = self.demos_x[i][j] - self.demos_x[i][j - 1]
dx = np.zeros(self.nb_dim)
else: # initial state j = 0
# dx = self.demos_x[i][j + 1] - self.demos_x[i][j]
dx = np.zeros(self.nb_dim)
dx = dx / 0.01
demo_dx.append(dx)
demos_dx.append(np.array(demo_dx))
return demos_dx
def get_A_b(self):
"""
Get transformation matrices from custom task parameters
:return: demos_A, demos_b, demos_A_xdx, demos_b_xdx
"""
task_params = self.task_params
if 'demos_A_xdx' in task_params and 'demos_b_xdx' in task_params:
demos_A, demos_b, demos_A_xdx, demos_b_xdx = task_params['demos_A'], task_params['demos_b'], task_params[
'demos_A_xdx'], task_params['demos_b_xdx']
elif 'frame_origins' in task_params:
frame_origins = task_params['frame_origins']
demos_b = []
demos_A = []
for i in range(len(frame_origins[0])): # TODO: check this
demos_b.append(np.tile(np.vstack([frame[i][: self.nb_dim] for frame in frame_origins]),
(len(self.demos_x[i]), 1, 1)))
demos_A.append(np.tile([np.eye(self.nb_dim)] * self.nb_frames, (len(self.demos_x[i]), 1, 1, 1)))
demos_A_xdx = [np.kron(np.eye(self.nb_deriv), demos_A[i]) for i in range(len(demos_A))]
demos_b_xdx = [np.concatenate([d, np.zeros(d.shape)], axis=-1) for d in demos_b]
else:
raise ValueError('Invalid task_params')
self.demo_A = demos_A
self.demo_b = demos_b
self.demo_A_xdx = demos_A_xdx
self.demo_b_xdx = demos_b_xdx
return self.demo_A, self.demo_b, self.demo_A_xdx, self.demo_b_xdx
def get_related_matrix(self):
demos_dx = self.get_dx()
demos_A, demos_b, demos_A_xdx, demos_b_xdx = self.get_A_b()
demos_xdx = [np.hstack([_x, _dx]) for _x, _dx in
zip(self.demos_x, demos_dx)] # Position and velocity (nb_points, 14)
demos_xdx_f = [np.einsum('taji,taj->tai', _A, _x[:, None] - _b) for _x, _A, _b in
zip(demos_xdx, demos_A_xdx, demos_b_xdx)]
demos_xdx_augm = [d.reshape(-1, self.nb_deriv * self.nb_frames * self.nb_dim) for d in
demos_xdx_f] # (nb_points, 14 * nb_frames)
return demos_dx, demos_A, demos_b, demos_A_xdx, demos_b_xdx, demos_xdx, demos_xdx_f, demos_xdx_augm
def hmm_learning(self) -> HMM:
beauty_print('{}'.format('learning HMM model'), type='info')
model = HMM(nb_states=self.nb_states)
model.init_hmm_kbins(self.demos_xdx_augm) # initializing model
model.em(self.demos_xdx_augm, reg=self.reg)
fig = rf.ml.hmm_plot(self.nb_dim, self.demos_xdx_f, model, self.task_params)
if self.save:
rf.visualab.save_img(fig, self.save_params['save_dir'], format=self.save_params['format'])
if self.plot:
plt.show()
return model
def poe(self, model: HMM, show_demo_idx: int) -> GMM:
"""
Product of Expert/Gaussian (PoE), which calculates the mixture distribution from multiple coordinates
:param model: learned model
:param show_demo_idx: index of the specific demo to be reproduced
:return: The product of experts
"""
mod_list = []
# get transformation for given demonstration.
# We use the transformation of the first timestep as they are constant
if len(self.task_params['frame_origins'][0]) == 1: # For new task parameters generation
A, b = self.demo_A_xdx[0][0], self.demo_b_xdx[0][0] # Attention: here we use self.demo_A_xdx not
# self.demos_A_xdx, because we just called get_A_b not get_related_matrix
else:
A, b = self.demos_A_xdx[show_demo_idx][0], self.demos_b_xdx[show_demo_idx][0]
for p in range(self.nb_frames):
# transformed model for coordinate system p
mod_list.append(model.marginal_model(slice(p * len(b[0]), (p + 1) * len(b[0]))).lintrans(A[p], b[p]))
# product
prod = mod_list[0]
for p in range(1, self.nb_frames):
prod *= mod_list[p]
fig = rf.ml.poe_plot(self.nb_dim, mod_list, prod, self.demos_x, show_demo_idx, self.task_params)
if self.save:
rf.visualab.save_img(fig, self.save_params['save_dir'], format=self.save_params['format'])
if self.plot:
plt.show()
return prod
def _reproduce(self, model: HMM, prod: GMM, show_demo_idx: int, start_xdx: np.ndarray, title: str = None,
label: str = None) -> np.ndarray:
"""
Reproduce the specific demo_idx from the learned model
:param model: learned model
:param prod: result of PoE
:param show_demo_idx: index of the specific demo to be reproduced
:param start_xdx: start state
:param title: title of the plot
:param label: label of the plot
:return:
"""
# get the most probable sequence of state for this demonstration
sq = model.viterbi(self.demos_xdx_augm[show_demo_idx])
# solving LQR with Product of Gaussian, see notebook on LQR
lqr = rf.lqr.PoGLQR(nb_dim=self.nb_dim, dt=0.01, horizon=self.demos_xdx[show_demo_idx].shape[0])
lqr.mvn_xi = prod.concatenate_gaussian(sq) # augmented version of gaussian
lqr.mvn_u = -4
lqr.x0 = start_xdx
xi = lqr.seq_xi
fig = rf.ml.gen_plot(self.nb_dim, xi, prod, self.demos_x, show_demo_idx, title=title, label=label)
if self.save:
rf.visualab.save_img(fig, self.save_params['save_dir'], format=self.save_params['format'])
if self.plot:
plt.show()
return xi
def fit(self) -> HMM:
"""
Learning the single arm/agent trajectory representation from demonstration via TP-GMM.
"""
beauty_print('Learning the trajectory representation from demonstration via TP-GMM')
model = self.hmm_learning()
return model
def reproduce(self, model: HMM, show_demo_idx: int) -> Tuple[np.ndarray, GMM]:
"""
Reproduce the specific demo_idx from the learned model
"""
beauty_print('reproduce {}-th demo from learned representation'.format(show_demo_idx), type='info')
prod = self.poe(model, show_demo_idx)
traj = self._reproduce(model, prod, show_demo_idx, self.demos_xdx[show_demo_idx][0],
title="Trajectory reproduction", label="reproduced line")
return traj, prod
def generate(self, model: HMM, ref_demo_idx: int) -> Tuple[np.ndarray, GMM]:
"""
Generate a new trajectory from the learned model
"""
beauty_print('generate a trajectory from learned representation with new task parameters', type='info')
self.get_A_b()
prod = self.poe(model, ref_demo_idx)
traj = self._reproduce(model, prod, ref_demo_idx, self.task_params['frame_origins'][0][0],
title="Trajectory generation", label="generated line")
return traj, prod
class TPGMMBi(TPGMM):
"""
Simple TPGMMBi (no coordination)
"""
def __init__(self, demos_left_x: Union[List, np.ndarray], demos_right_x: Union[List, np.ndarray], task_params: dict,
nb_states: int = 4, reg: float = 1e-3, plot: bool = False, save: bool = False,
save_params: dict = None):
assert len(demos_left_x) == len(
demos_right_x), 'The number of demonstrations for left and right arm should be the same'
assert len(demos_left_x[0]) == len(
demos_right_x[0]), 'The number of timesteps for left and right arm should be the same'
self.demos_left_x = demos_left_x
self.demos_right_x = demos_right_x
self.nb_dim = len(demos_left_x[0][0])
self.plot = plot
self.save = save
self.save_params = save_params
self.repr_l = TPGMM(demos_left_x, task_params['left'], nb_states=nb_states, reg=reg, plot=plot,
save=save, save_params=save_params)
self.repr_r = TPGMM(demos_right_x, task_params['right'], nb_states=nb_states, reg=reg, plot=plot,
save=save, save_params=save_params)
def fit(self) -> Tuple[HMM, HMM]:
"""
Learning the single arm/agent trajectory representation from demonstration via TP-GMM.
"""
beauty_print('Learning the bimanual trajectory representation from demonstration via TP-GMM')
model_l = self.repr_l.hmm_learning()
model_r = self.repr_r.hmm_learning()
return model_l, model_r
def reproduce(self, models: List, show_demo_idx: int) -> Tuple[ndarray, ndarray, GMM, GMM]:
"""
Reproduce the specific demo_idx from the learned model
"""
beauty_print('reproduce {}-th demo from learned representation'.format(show_demo_idx), type='info')
model_l, model_r = models
prod_l = self.repr_l.poe(model_l, show_demo_idx)
prod_r = self.repr_r.poe(model_r, show_demo_idx)
traj_l = self.repr_l._reproduce(model_l, prod_l, show_demo_idx, self.repr_l.demos_xdx[show_demo_idx][0])
traj_r = self.repr_r._reproduce(model_r, prod_r, show_demo_idx, self.repr_r.demos_xdx[show_demo_idx][0])
data_lst = [traj_l[:, :self.repr_l.nb_dim], traj_r[:, :self.repr_r.nb_dim]]
fig = rf.visualab.traj_plot(data_lst, title='Reproduced bimanual trajectories')
if self.save:
rf.visualab.save_img(fig, self.save_params['save_dir'], format=self.save_params['format'])
if self.plot:
plt.show()
return traj_l, traj_r, prod_l, prod_r
def generate(self, models: List, ref_demo_idx: int) -> Tuple[ndarray, ndarray, GMM, GMM]:
"""
Generate a new trajectory from the learned model
"""
beauty_print('generate trajectories from learned representation with new task parameters', type='info')
model_l, model_r = models
self.repr_l.task_params = self.task_params['left']
self.repr_r.task_params = self.task_params['right']
self.repr_l.get_A_b()
self.repr_r.get_A_b()
prod_l = self.repr_l.poe(model_l, ref_demo_idx)
prod_r = self.repr_r.poe(model_r, ref_demo_idx)
traj_l = self.repr_l._reproduce(model_l, prod_l, ref_demo_idx, self.repr_l.task_params['frame_origins'][0][0])
traj_r = self.repr_r._reproduce(model_r, prod_r, ref_demo_idx, self.repr_r.task_params['frame_origins'][0][0])
data_lst = [traj_l[:, :self.repr_l.nb_dim], traj_r[:, :self.repr_r.nb_dim]]
fig = rf.visualab.traj_plot(data_lst, title='Generated bimanual trajectories')
if self.save:
rf.visualab.save_img(fig, self.save_params['save_dir'], format=self.save_params['format'])
if self.plot:
plt.show()
return traj_l, traj_r, prod_l, prod_r
class TPGMM_RPCtrl(TPGMMBi):
"""
Simple TPGMMBi (no coordination in the representation) with bimanual coordination in the LQR controller
"""
def __init__(self, demos_left_x, demos_right_x, task_params, nb_states: int = 4, reg: float = 1e-3,
plot: bool = False, save: bool = False, save_params: dict = None):
super().__init__(demos_left_x, demos_right_x, task_params, nb_states=nb_states, reg=reg, plot=plot,
save=save, save_params=save_params)
self.demos_com_x = self.get_rel_demos()
start_xdx = [self.demos_com_x[i][0] for i in range(len(self.demos_com_x))] # TODO: change to xdx
end_xdx = [self.demos_com_x[i][-1] for i in range(len(self.demos_com_x))]
task_params_c = {'frame_origins': [start_xdx, end_xdx], 'frame_names': ['start', 'end']}
self.repr_c = TPGMM(self.demos_com_x, task_params_c, nb_states=nb_states, reg=reg, plot=plot, save=save,
save_params=save_params)
def get_rel_demos(self):
# calculate the relative movement of each demo
rel_demos = []
for i in range(len(self.demos_left_x)):
rel_demo = np.zeros_like(self.demos_left_x[i])
for j in range(len(self.demos_left_x[i])):
# rel_demos[i, j] = np.linalg.norm(left_x[i, j] - right_x[i, j])
rel_demo[j] = self.demos_left_x[i][j] - self.demos_right_x[i][j]
rel_demos.append(rel_demo)
return rel_demos
def _bi_reproduce(self, model_l, prod_l, model_r, prod_r, model_c, prod_c, show_demo_idx, start_xdx_lst):
"""
Reproduce the specific demo_idx from the learned model
:param model_l: learned model for left arm
:param prod_l: result of PoE for left arm
:param model_r: learned model for right arm
:param prod_r: result of PoE for ri+588/5/8ght arm
:param model_c: learned model for relative movement
:param prod_c: result of PoE for relative movement
:param show_demo_idx: index of the specific demo to be reproduced
:param start_xdx_lst: the start states for both arms, List
:return:
"""
# get the most probable sequence of state for this demonstration
sq_l = model_l.viterbi(self.repr_l.demos_xdx_augm[show_demo_idx])
sq_r = model_r.viterbi(self.repr_r.demos_xdx_augm[show_demo_idx])
sq_c = model_c.viterbi(self.repr_c.demos_xdx_augm[show_demo_idx])
# solving LQR with Product of Gaussian, see notebook on LQR
lqr = rf.lqr.PoGLQRBi(nb_dim=self.nb_dim, dt=0.01, horizon=self.demos_left_x[show_demo_idx].shape[0])
lqr.mvn_xi_l = prod_l.concatenate_gaussian(sq_l) # augmented version of gaussian
lqr.mvn_xi_r = prod_r.concatenate_gaussian(sq_r) # augmented version of gaussian
lqr.mvn_xi_c = prod_c.concatenate_gaussian(sq_c) # augmented version of gaussian
lqr.mvn_u = -4 # N(0, R_s) R_s: R^{DTxDT}
lqr.x0_l = start_xdx_lst[0] # zeta_0 R^DC => 4
lqr.x0_r = start_xdx_lst[1]
lqr.x0_c = start_xdx_lst[2]
xi_l, xi_r = lqr.seq_xi
return xi_l, xi_r
def fit(self) -> Tuple[HMM, HMM, HMM]:
model_l, model_r = super().fit()
model_c = self.repr_c.hmm_learning()
return model_l, model_r, model_c
def reproduce(self, models, show_demo_idx: int) -> Tuple[ndarray, ndarray, GMM, GMM]:
"""
Reproduce the specific demo_idx from the learned model
:param models: List of learned models for left, right and relative movement
:param show_demo_idx: index of the specific demo to be reproduced
:return:
"""
beauty_print('reproduce {}-th demo from learned representation'.format(show_demo_idx), type='info')
model_l, model_r, model_c = models
prod_l = self.repr_l.poe(model_l, show_demo_idx=show_demo_idx)
prod_r = self.repr_r.poe(model_r, show_demo_idx=show_demo_idx)
prod_c = self.repr_c.poe(model_c, show_demo_idx=show_demo_idx)
# Coordinated trajectory
ctraj_l, ctraj_r = self._bi_reproduce(model_l, prod_l, model_r, prod_r, model_c, prod_c, show_demo_idx,
start_xdx_lst=[self.repr_l.demos_xdx[show_demo_idx][0],
self.repr_r.demos_xdx[show_demo_idx][0],
self.repr_c.demos_xdx[show_demo_idx][0]])
data_lst = [ctraj_l[:, :self.repr_l.nb_dim], ctraj_r[:, :self.repr_r.nb_dim]]
fig = rf.visualab.traj_plot(data_lst, title='Reproduced bimanual trajectories')
if self.save:
rf.visualab.save_img(fig, self.save_params['save_dir'], format=self.save_params['format'])
if self.plot:
plt.show()
return ctraj_l, ctraj_r, prod_l, prod_r
def generate(self, models: List, ref_demo_idx: int) -> Tuple[ndarray, ndarray, GMM, GMM]:
"""
Generate a new trajectory from the learned model
:param models: List of learned models for left, right and relative movement
:param ref_demo_idx: index of the specific demo to be referenced
:return:
"""
beauty_print('generate trajectories from learned representation with new task parameters', type='info')
model_l, model_r, model_c = models
self.repr_l.task_params = self.task_params['left']
self.repr_r.task_params = self.task_params['right']
self.repr_c.task_params = self.task_params['relative']
self.repr_l.get_A_b()
self.repr_r.get_A_b()
self.repr_c.get_A_b()
prod_l = self.repr_l.poe(model_l, ref_demo_idx)
prod_r = self.repr_r.poe(model_r, ref_demo_idx)
prod_c = self.repr_c.poe(model_c, ref_demo_idx)
ctraj_l, ctraj_r = self._bi_reproduce(model_l, prod_l, model_r, prod_r, model_c, prod_c, ref_demo_idx,
# start_xdx_lst=[self.repr_l.task_params['frame_origins'][0][0],
# self.repr_r.task_params['frame_origins'][0][0],
# self.repr_c.task_params['frame_origins'][0][0]])
start_xdx_lst=[self.repr_l.demos_xdx[ref_demo_idx][0],
self.repr_r.demos_xdx[ref_demo_idx][0],
self.repr_c.demos_xdx[ref_demo_idx][0]])
data_lst = [ctraj_l[:, :self.repr_l.nb_dim], ctraj_r[:, :self.repr_r.nb_dim]]
fig = rf.visualab.traj_plot(data_lst, title='Generated bimanual trajectories')
if self.save:
rf.visualab.save_img(fig, self.save_params['save_dir'], format=self.save_params['format'])
if self.plot:
plt.show()
return ctraj_l, ctraj_r, prod_l, prod_r
class TPGMM_RPRepr(TPGMMBi):
"""
TPGMM for bimanual coordination in representation
"""
def __init__(self, demos_left_x, demos_right_x, task_params, nb_states: int = 4, reg: float = 1e-3,
plot: bool = False, save: bool = False, save_params: dict = None, **kwargs):
self.demos_left_x = demos_left_x
self.demos_right_x = demos_right_x
self.nb_dim = len(demos_left_x[0][0])
self.plot = plot
self.save = save
self.save_params = save_params
self.nb_frames = len(task_params['left']['frame_origins']) + 1 # Additional observe frame from the other arm
self.nb_deriv = 2
self.nb_states = nb_states
self.task_params = task_params
self.get_rel_task_params()
self.repr_l = TPGMM(demos_left_x, self.task_params['left'], nb_states=nb_states, nb_frames=self.nb_frames,
reg=reg, plot=plot, save=save, save_params=save_params)
self.repr_r = TPGMM(demos_right_x, self.task_params['right'], nb_states=nb_states, nb_frames=self.nb_frames,
reg=reg, plot=plot, save=save, save_params=save_params)
def get_rel_task_params(self):
demos_b_l, demos_b_r = [], []
demos_A_l, demos_A_r = [], []
frame_origins_l = self.task_params['left']['frame_origins']
frame_origins_r = self.task_params['right']['frame_origins']
for i in range(len(frame_origins_l[0])):
demos_A_l.append(np.tile([np.eye(self.nb_dim)] * self.nb_frames, (len(self.demos_left_x[i]), 1, 1, 1)))
demos_A_r.append(np.tile([np.eye(self.nb_dim)] * self.nb_frames, (len(self.demos_right_x[i]), 1, 1, 1)))
demos_b_l.append(np.hstack((np.tile(np.vstack([frame[i][: self.nb_dim] for frame in frame_origins_l]),
(len(self.demos_left_x[i]), 1, 1)),
np.expand_dims(self.demos_right_x[i], axis=1))))
demos_b_r.append(np.hstack((np.tile(np.vstack([frame[i][: self.nb_dim] for frame in frame_origins_r]),
(len(self.demos_right_x[i]), 1, 1)),
np.expand_dims(self.demos_left_x[i], axis=1))))
demos_A_xdx_l = [np.kron(np.eye(self.nb_deriv), demos_A_l[i]) for i in range(len(demos_A_l))]
demos_A_xdx_r = [np.kron(np.eye(self.nb_deriv), demos_A_r[i]) for i in range(len(demos_A_r))]
demos_b_xdx_l = [np.concatenate([d, np.zeros(d.shape)], axis=-1) for d in demos_b_l]
demos_b_xdx_r = [np.concatenate([d, np.zeros(d.shape)], axis=-1) for d in demos_b_r]
self.task_params = {
'left': {'frame_names': self.task_params['left']['frame_names'] + ['relative'],
'demos_A': demos_A_l, 'demos_b': demos_b_l,
'demos_A_xdx': demos_A_xdx_l, 'demos_b_xdx': demos_b_xdx_l},
'right': {'frame_names': self.task_params['right']['frame_names'] + ['relative'],
'demos_A': demos_A_r, 'demos_b': demos_b_r,
'demos_A_xdx': demos_A_xdx_r, 'demos_b_xdx': demos_b_xdx_r}}
def _get_dyna_A_b(self, model, repr, show_demo_idx, task_params=None):
ref_frame = 0 # TODO: 0 for start frame, 1 for end frame, 2 for the relative frame
if task_params is not None:
repr.P = 2
demos_dx, demos_A, demos_b, demos_A_xdx, demos_b_xdx, demos_xdx, demos_xdx_f, demos_xdx_augm = repr.get_related_matrix(
task_params)
repr.P = 3
# demos_xdx_f = [
# np.concatenate([demos_xdx_f[0], np.expand_dims(repr.demos_xdx_f[show_demo_idx][:, 2], axis=1)], axis=1)]
demos_xdx_f = repr.demos_xdx_f
demos_A_xdx = [
np.concatenate([demos_A_xdx[0], np.expand_dims(repr.demos_A_xdx[show_demo_idx][:, 2], axis=1)], axis=1)]
demos_b_xdx = [
np.concatenate([demos_b_xdx[0], np.expand_dims(repr.demos_b_xdx[show_demo_idx][:, 2], axis=1)], axis=1)]
show_demo_idx = 0
else:
demos_xdx_f = repr.demos_xdx_f
demos_A_xdx = repr.demos_A_xdx
demos_b_xdx = repr.demos_b_xdx
mus = model._mu[:,
self.nb_dim * self.nb_deriv * ref_frame:self.nb_dim * self.nb_deriv * ref_frame + self.nb_dim]
demo_traj_in_ref_frame = demos_xdx_f[show_demo_idx][:, ref_frame, :self.nb_dim]
index_list = []
for mu in mus:
dist = list(scipy.spatial.distance.cdist([mu], demo_traj_in_ref_frame)[0])
index = dist.index(min(dist))
index_list.append(index)
A, b = demos_A_xdx[show_demo_idx][index_list], demos_b_xdx[show_demo_idx][index_list]
return A, b, index_list
def _uni_poe(self, model, repr, show_demo_idx, task_params=None) -> GMM:
if task_params is None:
A, b, _ = self._get_dyna_A_b(model, repr, show_demo_idx)
else:
A, b = task_params['A'], task_params['b']
mod_list = []
for p in range(self.nb_frames):
mod_list.append(
model.marginal_model(
slice(p * self.nb_deriv * self.nb_dim, (p + 1) * self.nb_deriv * self.nb_dim)).lintrans_dyna(
A[:, p], b[:, p]))
# product
# prod = mod_list[0]
# for p in range(1, self.nb_frames):
# prod *= mod_list[p]
# Weighted product
prod = mod_list[0] * mod_list[1] * mod_list[2] * mod_list[2] * mod_list[2] * mod_list[2] * mod_list[2] * \
mod_list[2] * mod_list[2]
fig = rf.ml.poe_plot(self.nb_dim, mod_list, prod, repr.demos_x, show_demo_idx, repr.task_params)
if self.save:
rf.visualab.save_img(fig, self.save_params['save_dir'], format=self.save_params['format'])
if self.plot:
plt.show()
return prod
def _bi_poe(self, models: List, show_demo_idx: int) -> Tuple[GMM, GMM]:
"""
Product of Expert/Gaussian (PoE), which calculates the mixture distribution from multiple coordinates
:param models: list of pre_trained_models
:param show_demo_idx: index of the specific demo to be reproduced
:return: The product of experts
"""
model_l, model_r = models
prod_l = self._uni_poe(model_l, self.repr_l, show_demo_idx)
prod_r = self._uni_poe(model_r, self.repr_r, show_demo_idx)
return prod_l, prod_r
def fit(self) -> Tuple[HMM, HMM]:
model_l, model_r = super().fit()
return model_l, model_r
def reproduce(self, models: List, show_demo_idx: int) -> Tuple[ndarray, ndarray, GMM, GMM]:
beauty_print('reproduce {}-th demo from learned representation'.format(show_demo_idx), type='info')
model_l, model_r = models
prod_l, prod_r = self._bi_poe([model_l, model_r], show_demo_idx)
ctraj_l = self.repr_l._reproduce(model_l, prod_l, show_demo_idx, self.repr_l.demos_xdx[show_demo_idx][0])
ctraj_r = self.repr_r._reproduce(model_r, prod_r, show_demo_idx, self.repr_r.demos_xdx[show_demo_idx][0])
data_lst = [ctraj_l[:, :self.nb_dim], ctraj_r[:, :self.nb_dim]]
fig = rf.visualab.traj_plot(data_lst, title='Reproduced bimanual trajectories')
if self.save:
rf.visualab.save_img(fig, self.save_params['save_dir'], format=self.save_params['format'])
if self.plot:
plt.show()
return ctraj_l, ctraj_r, prod_l, prod_r
def iterative_generate(self, model_l: HMM, model_r: HMM, ref_demo_idx: int, nb_iter=1) -> \
Tuple[ndarray, ndarray, GMM, GMM]:
beauty_print('generate trajectories from learned representation with new task parameters iteratively',
type='info')
vanilla_repr = TPGMMBi(self.demos_left_x, self.demos_right_x, task_params=self.task_params,
nb_states=self.nb_states, plot=self.plot, save=self.save, save_params=self.save_params)
vanilla_repr.task_params = self.task_params
vanilla_model_l, vanilla_model_r = vanilla_repr.fit()
vanilla_traj_l, vanilla_traj_r, _, _ = vanilla_repr.generate([vanilla_model_l, vanilla_model_r],
ref_demo_idx=ref_demo_idx)
# _, _, _, _, task_params_A_b_l = vanilla_repr.repr_l.get_A_b_from_task_params(task_params['left'])
# _, _, _, _, task_params_A_b_r = vanilla_repr.repr_r.get_A_b_from_task_params(task_params['right'])
#
# # Generate without coordination originally
# vanilla_prod_l = vanilla_repr.repr_l.poe(vanilla_model_l, ref_demo_idx, task_params_A_b_l)
# vanilla_prod_r = vanilla_repr.repr_r.poe(vanilla_model_r, ref_demo_idx, task_params_A_b_r)
# vanilla_traj_l = vanilla_repr.repr_l._reproduce(vanilla_model_l, vanilla_prod_l, ref_demo_idx,
# task_params['left']['start_xdx'])
# vanilla_traj_r = vanilla_repr.repr_r._reproduce(vanilla_model_r, vanilla_prod_r, ref_demo_idx,
# task_params['right']['start_xdx'])
data_lst = [vanilla_traj_l[:, :self.nb_dim], vanilla_traj_r[:, :self.nb_dim]]
fig = rf.visualab.traj_plot(data_lst, title='Generate Trajectories without Coordination')
if self.save:
rf.visualab.save_img(fig, self.save_params['save_dir'], format=self.save_params['format'])
if self.plot:
plt.show()
# Add coordination to generate the trajectories iteratively
traj_l, traj_r = vanilla_traj_l, vanilla_traj_r
for i in range(nb_iter):
self.task_params['left']['traj'] = traj_l[:, :self.nb_dim]
_, ctraj_r, _, prod_r = self.conditional_generate(model_l, model_r, ref_demo_idx, leader='left')
self.task_params['right']['traj'] = traj_r[:, :self.nb_dim]
_, ctraj_l, _, prod_l = self.conditional_generate(model_l, model_r, ref_demo_idx, leader='right')
traj_l, traj_r = ctraj_l, ctraj_r
data_lst = [traj_l[:, :self.nb_dim], traj_r[:, :self.nb_dim]]
fig = rf.visualab.traj_plot(data_lst, title='Generated bimanual trajectories in synergistic manner,'
' iteration: {}-th'.format(i + 1))
if self.save:
rf.visualab.save_img(fig, self.save_params['save_dir'], format=self.save_params['format'])
if self.plot:
plt.show()
return ctraj_l, ctraj_r, prod_l, prod_r
def conditional_generate(self, model_l: HMM, model_r: HMM, ref_demo_idx: int, leader: str) -> \
Tuple[ndarray, ndarray, None, GMM]:
follower = 'left' if leader == 'right' else 'right'
leader_traj = self.task_params[leader]['traj']
models = {'left': model_l, 'right': model_r}
reprs = {'left': self.repr_l, 'right': self.repr_r}
beauty_print('generate the {} trajectory from learned representation conditioned on the {} trajectory'.format(
follower, leader), type='info')
A, b, index_list = self._get_dyna_A_b(models[follower], reprs[follower], ref_demo_idx,
task_params=self.task_params[follower])
b[:, 2, :self.nb_dim] = leader_traj[index_list, :self.nb_dim]
follower_prod = self._uni_poe(models[follower], reprs[follower], ref_demo_idx, task_params={'A': A, 'b': b})
follower_traj = reprs[follower]._reproduce(models[follower], follower_prod, ref_demo_idx,
self.task_params[follower]['start_xdx'])
data_lst = [leader_traj[:, :self.nb_dim], follower_traj[:, :self.nb_dim]]
fig = rf.visualab.traj_plot(data_lst, title='Generated bimanual trajectories in leader-follower manner')
if self.save:
rf.visualab.save_img(fig, self.save_params['save_dir'], format=self.save_params['format'])
if self.plot:
plt.show()
return leader_traj, follower_traj, None, follower_prod
def generate(self, models: List, ref_demo_idx: int, leader: str = None):
model_l, model_r = models
if leader is None:
return self.iterative_generate(model_l, model_r, ref_demo_idx)
else:
return self.conditional_generate(model_l, model_r, ref_demo_idx, leader)
class TPGMM_RPAll(TPGMM_RPRepr, TPGMM_RPCtrl):
"""
TPGMM for bimanual coordination in both representation and LQR controller
"""
def __init__(self, demos_left_x, demos_right_x, nb_states: int = 4, reg: float = 1e-3, horizon: int = 150,
plot: bool = False, save: bool = False, save_params: dict = None, **kwargs):
TPGMM_RPRepr.__init__(self, demos_left_x, demos_right_x, nb_states, reg, horizon, plot=plot, save=save,
save_params=save_params, **kwargs)
self.demos_com_x = self.get_rel_demos()
self.repr_c = TPGMM(self.demos_com_x, nb_states=nb_states, reg=reg, horizon=horizon, plot=plot, save=save,
save_params=save_params, **kwargs)
def fit(self):
return TPGMM_RPCtrl.fit(self)
def reproduce(self, model_l: HMM, model_r: HMM, model_c: HMM, show_demo_idx: int) -> Tuple[
ndarray, ndarray, GMM, GMM]:
beauty_print('reproduce {}-th demo from learned representation'.format(show_demo_idx), type='info')
prod_l, prod_r = self._bi_poe(model_l, model_r, show_demo_idx)
prod_c = self.repr_c.poe(model_c, show_demo_idx=show_demo_idx)
# Coordinated trajectory
ctraj_l, ctraj_r = self._bi_reproduce(model_l, prod_l, model_r, prod_r, model_c, prod_c, show_demo_idx)
return ctraj_l, ctraj_r, prod_l, prod_r
def generate(self, model_l: HMM, model_r: HMM, model_c: HMM, ref_demo_idx: int, task_params: dict,
leader: str = None):
# TODO
if leader is None:
_, _, prod_l, prod_r = self.iterative_generate(model_l, model_r, ref_demo_idx, task_params)
prod_c = self.repr_c.poe(model_c, show_demo_idx=ref_demo_idx)
traj_l, traj_r = self._bi_reproduce(model_l, prod_l, model_r, prod_r, model_c, prod_c, ref_demo_idx)
return traj_l, traj_r, None, None
else:
return self.conditional_generate(model_l, model_r, ref_demo_idx, task_params, leader)