itsMarco-G commited on
Commit
858f68f
·
0 Parent(s):

Add movement scheduler and integrate into tracker

Browse files
Files changed (1) hide show
  1. reachy_phone_home/movements.py +196 -0
reachy_phone_home/movements.py ADDED
@@ -0,0 +1,196 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Situation-based movement library for Reachy Mini."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from dataclasses import dataclass
6
+ import random
7
+ import time
8
+ from typing import Callable, Dict, List, Optional
9
+
10
+ from reachy_mini import ReachyMini
11
+ from reachy_mini.motion.recorded_move import RecordedMove, RecordedMoves
12
+
13
+
14
+ @dataclass(frozen=True)
15
+ class Movement:
16
+ name: str
17
+ run: Callable[[], None]
18
+
19
+
20
+ class SituationMovements:
21
+ """Expose recorded moves grouped by situation."""
22
+
23
+ EMOTIONS_DATASET = "pollen-robotics/reachy-mini-emotions-library"
24
+ DANCES_DATASET = "pollen-robotics/reachy-mini-dances-library"
25
+
26
+ def __init__(self, reachy: ReachyMini) -> None:
27
+ self.reachy = reachy
28
+ self._emotions: Optional[RecordedMoves] = None
29
+ self._dances: Optional[RecordedMoves] = None
30
+
31
+ self._groups: Dict[str, List[str]] = {
32
+ "ambient": [
33
+ "amazed1",
34
+ "attentive1",
35
+ "attentive2",
36
+ "cheerful1",
37
+ "curious1",
38
+ "helpful1",
39
+ "impatient1",
40
+ ],
41
+ "good_job": [
42
+ "dance1",
43
+ "grateful1",
44
+ "success1",
45
+ "proud1",
46
+ ],
47
+ "checking": [
48
+ "confused1",
49
+ ],
50
+ "bad": [
51
+ "disgusted1",
52
+ "displeased1",
53
+ "displeased2",
54
+ ],
55
+ "sad": [
56
+ "downcast1",
57
+ "lonely1",
58
+ ],
59
+ "response_yes": [
60
+ "enthusiastic1",
61
+ "inquiring1",
62
+ "inquiring2",
63
+ "welcoming2",
64
+ "understanding2",
65
+ ],
66
+ "response_no": [
67
+ "uncertain1",
68
+ "no1",
69
+ ],
70
+ }
71
+
72
+ self._dance_only = {"dance1"}
73
+
74
+ def _load_emotions(self) -> None:
75
+ if self._emotions is None:
76
+ self._emotions = RecordedMoves(self.EMOTIONS_DATASET)
77
+
78
+ def _load_dances(self) -> None:
79
+ if self._dances is None:
80
+ self._dances = RecordedMoves(self.DANCES_DATASET)
81
+
82
+ def _get_move(self, name: str) -> Optional[RecordedMove]:
83
+ if name in self._dance_only:
84
+ self._load_dances()
85
+ assert self._dances is not None
86
+ if name in self._dances.list_moves():
87
+ return self._dances.get(name)
88
+ return None
89
+
90
+ self._load_emotions()
91
+ assert self._emotions is not None
92
+ if name in self._emotions.list_moves():
93
+ return self._emotions.get(name)
94
+ return None
95
+
96
+ def _as_movements(self, names: List[str]) -> List[Movement]:
97
+ movements: List[Movement] = []
98
+ for name in names:
99
+ move = self._get_move(name)
100
+ if move is None:
101
+ continue
102
+ movements.append(
103
+ Movement(
104
+ name=name,
105
+ run=lambda m=move: self.reachy.play_move(
106
+ m, initial_goto_duration=0.6, sound=False
107
+ ),
108
+ )
109
+ )
110
+ return movements
111
+
112
+ def ambient(self) -> List[Movement]:
113
+ return self._as_movements(self._groups["ambient"])
114
+
115
+ def good_job(self) -> List[Movement]:
116
+ return self._as_movements(self._groups["good_job"])
117
+
118
+ def checking(self) -> List[Movement]:
119
+ return self._as_movements(self._groups["checking"])
120
+
121
+ def bad(self) -> List[Movement]:
122
+ return self._as_movements(self._groups["bad"])
123
+
124
+ def sad(self) -> List[Movement]:
125
+ return self._as_movements(self._groups["sad"])
126
+
127
+ def response_yes(self) -> List[Movement]:
128
+ return self._as_movements(self._groups["response_yes"])
129
+
130
+ def response_no(self) -> List[Movement]:
131
+ return self._as_movements(self._groups["response_no"])
132
+
133
+ def list_group(self, group: str) -> List[Movement]:
134
+ if group not in self._groups:
135
+ raise ValueError(f"Unknown group: {group}")
136
+ return self._as_movements(self._groups[group])
137
+
138
+
139
+ class MovementScheduler:
140
+ """Trigger situation movements based on heartbeat and phone-use state."""
141
+
142
+ def __init__(
143
+ self,
144
+ movements: SituationMovements,
145
+ good_job_heartbeats: int = 3,
146
+ phone_use_bad_sec: float = 10.0,
147
+ phone_use_clear_sec: float = 3.0,
148
+ restore_head_duration: float = 0.6,
149
+ ) -> None:
150
+ self.movements = movements
151
+ self.good_job_heartbeats = int(good_job_heartbeats)
152
+ self.phone_use_bad_sec = float(phone_use_bad_sec)
153
+ self.phone_use_clear_sec = float(phone_use_clear_sec)
154
+ self.restore_head_duration = float(restore_head_duration)
155
+
156
+ self._good_job_streak = 0
157
+ self._phone_use_start: Optional[float] = None
158
+ self._phone_use_clear_start: Optional[float] = None
159
+
160
+ self._good_job_moves = movements.good_job()
161
+ self._bad_moves = movements.bad()
162
+
163
+ def _pick_and_run(self, pool: List[Movement]) -> None:
164
+ if not pool:
165
+ return
166
+ reachy = self.movements.reachy
167
+ prev_head = reachy.get_current_head_pose()
168
+ random.choice(pool).run()
169
+ reachy.goto_target(head=prev_head, duration=self.restore_head_duration)
170
+
171
+ def on_heartbeat(self, *, phone_tracked: bool, phone_use: bool, now: Optional[float] = None) -> None:
172
+ """Call on each heartbeat with current phone states."""
173
+ if now is None:
174
+ now = time.time()
175
+
176
+ if phone_tracked and not phone_use:
177
+ self._good_job_streak += 1
178
+ if self._good_job_streak >= self.good_job_heartbeats:
179
+ self._pick_and_run(self._good_job_moves)
180
+ self._good_job_streak = 0
181
+ else:
182
+ self._good_job_streak = 0
183
+
184
+ if phone_use:
185
+ self._phone_use_clear_start = None
186
+ if self._phone_use_start is None:
187
+ self._phone_use_start = now
188
+ elif now - self._phone_use_start >= self.phone_use_bad_sec:
189
+ self._pick_and_run(self._bad_moves)
190
+ self._phone_use_start = now
191
+ else:
192
+ if self._phone_use_clear_start is None:
193
+ self._phone_use_clear_start = now
194
+ elif now - self._phone_use_clear_start >= self.phone_use_clear_sec:
195
+ self._phone_use_start = None
196
+ self._phone_use_clear_start = None