forked from ringhyacinth/Star-Office-UI
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoffice-agent-push.py
More file actions
305 lines (260 loc) · 11.7 KB
/
office-agent-push.py
File metadata and controls
305 lines (260 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
#!/usr/bin/env python3
"""
海辛办公室 - Agent 状态主动推送脚本
用法:
1. 填入下面的 JOIN_KEY(你从海辛那里拿到的一次性 join key)
2. 填入 AGENT_NAME(你想要在办公室里显示的名字)
3. 运行:python office-agent-push.py
4. 脚本会自动先 join(首次运行),然后每 30s 向海辛办公室推送一次你的当前状态
"""
import json
import os
import time
import sys
from datetime import datetime
# === 你需要填入的信息 ===
JOIN_KEY = "" # 必填:你的一次性 join key
AGENT_NAME = "" # 必填:你在办公室里的名字
OFFICE_URL = "https://office.hyacinth.im" # 海辛办公室地址(一般不用改)
# === 推送配置 ===
PUSH_INTERVAL_SECONDS = 15 # 每隔多少秒推送一次(更实时)
STATUS_ENDPOINT = "/status"
JOIN_ENDPOINT = "/join-agent"
PUSH_ENDPOINT = "/agent-push"
# 自动状态守护:当本地状态文件不存在或长期不更新时,自动回 idle,避免“假工作中”
STALE_STATE_TTL_SECONDS = int(os.environ.get("OFFICE_STALE_STATE_TTL", "600"))
# 本地状态存储(记住上次 join 拿到的 agentId)
STATE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "office-agent-state.json")
# 优先读取本机 OpenClaw 工作区的状态文件(更贴合 AGENTS.md 的工作流)
# 支持自动发现,减少对方手动配置成本,且避免硬编码绝对路径:
# - 优先使用环境变量 OPENCLAW_HOME / OPENCLAW_WORKSPACE_DIR
# - 其次使用当前用户 HOME/.openclaw
# - 再回落到当前工作目录与脚本所在目录
OPENCLAW_HOME = os.environ.get("OPENCLAW_HOME") or os.path.join(os.path.expanduser("~"), ".openclaw")
OPENCLAW_WORKSPACE_DIR = os.environ.get("OPENCLAW_WORKSPACE_DIR") or os.path.join(OPENCLAW_HOME, "workspace")
DEFAULT_STATE_CANDIDATES = [
os.path.join(OPENCLAW_WORKSPACE_DIR, "star-office-ui", "state.json"),
os.path.join(OPENCLAW_WORKSPACE_DIR, "state.json"),
"/root/.openclaw/workspace/Star-Office-UI/state.json", # 当前仓库(大小写精确)
"/root/.openclaw/workspace/star-office-ui/state.json", # 历史/兼容路径
"/root/.openclaw/workspace/state.json",
os.path.join(os.getcwd(), "state.json"),
os.path.join(os.path.dirname(os.path.abspath(__file__)), "state.json"),
]
# 如果对方本地 /status 需要鉴权,可在这里填写 token(或通过环境变量 OFFICE_LOCAL_STATUS_TOKEN)
LOCAL_STATUS_TOKEN = os.environ.get("OFFICE_LOCAL_STATUS_TOKEN", "")
LOCAL_STATUS_URL = os.environ.get("OFFICE_LOCAL_STATUS_URL", "http://127.0.0.1:19000/status")
# 可选:直接指定本地状态文件路径(最简单方案:绕过 /status 鉴权)
LOCAL_STATE_FILE = os.environ.get("OFFICE_LOCAL_STATE_FILE", "")
VERBOSE = os.environ.get("OFFICE_VERBOSE", "0") in {"1", "true", "TRUE", "yes", "YES"}
def load_local_state():
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return {
"agentId": None,
"joined": False,
"joinKey": JOIN_KEY,
"agentName": AGENT_NAME
}
def save_local_state(data):
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def normalize_state(s):
"""兼容不同本地状态词,并映射到办公室识别状态。"""
s = (s or "").strip().lower()
if s in {"writing", "researching", "executing", "syncing", "error", "idle"}:
return s
if s in {"working", "busy", "write"}:
return "writing"
if s in {"run", "running", "execute", "exec"}:
return "executing"
if s in {"research", "search"}:
return "researching"
if s in {"sync"}:
return "syncing"
return "idle"
def map_detail_to_state(detail, fallback_state="idle"):
"""当只有 detail 时,用关键词推断状态(贴近 AGENTS.md 的办公区逻辑)。"""
d = (detail or "").lower()
if any(k in d for k in ["报错", "error", "bug", "异常", "报警"]):
return "error"
if any(k in d for k in ["同步", "sync", "备份"]):
return "syncing"
if any(k in d for k in ["调研", "research", "搜索", "查资料"]):
return "researching"
if any(k in d for k in ["执行", "run", "推进", "处理任务", "工作中", "writing"]):
return "writing"
if any(k in d for k in ["待命", "休息", "idle", "完成", "done"]):
return "idle"
return fallback_state
def _state_age_seconds(data):
try:
ts = (data or {}).get("updated_at")
if not ts:
return None
dt = datetime.fromisoformat(str(ts).replace("Z", "+00:00"))
if dt.tzinfo is not None:
from datetime import timezone
return (datetime.now(timezone.utc) - dt.astimezone(timezone.utc)).total_seconds()
return (datetime.now() - dt).total_seconds()
except Exception:
return None
def fetch_local_status():
"""读取本地状态:
1) 优先 state.json(符合 AGENTS.md:任务前切 writing,完成后切 idle)
2) 其次尝试本地 HTTP /status
3) 最后 fallback idle
额外防抖:如果本地状态更新时间超过 STALE_STATE_TTL_SECONDS,自动视为 idle。
"""
# 1) 读本地 state.json(优先读取显式指定路径,其次自动发现)
candidate_files = []
if LOCAL_STATE_FILE:
candidate_files.append(LOCAL_STATE_FILE)
for fp in DEFAULT_STATE_CANDIDATES:
if fp not in candidate_files:
candidate_files.append(fp)
for fp in candidate_files:
try:
if fp and os.path.exists(fp):
with open(fp, "r", encoding="utf-8") as f:
data = json.load(f)
# 只接受“状态文件”结构;避免误把 office-agent-state.json(仅缓存 agentId)当状态源
if not isinstance(data, dict):
continue
has_state = "state" in data
has_detail = "detail" in data
if (not has_state) and (not has_detail):
continue
state = normalize_state(data.get("state", "idle"))
detail = data.get("detail", "") or ""
# detail 兜底纠偏,确保“工作/休息/报警”能正确落区
state = map_detail_to_state(detail, fallback_state=state)
# 防止状态文件久未更新仍停留在 working 态
age = _state_age_seconds(data)
if age is not None and age > STALE_STATE_TTL_SECONDS:
state = "idle"
detail = f"本地状态超过{STALE_STATE_TTL_SECONDS}s未更新,自动回待命"
if VERBOSE:
print(f"[status-source:file] path={fp} state={state} detail={detail[:60]}")
return {"state": state, "detail": detail}
except Exception:
pass
# 2) 尝试本地 /status(可能需要鉴权)
try:
import requests
headers = {}
if LOCAL_STATUS_TOKEN:
headers["Authorization"] = f"Bearer {LOCAL_STATUS_TOKEN}"
r = requests.get(LOCAL_STATUS_URL, headers=headers, timeout=5)
if r.status_code == 200:
data = r.json()
state = normalize_state(data.get("state", "idle"))
detail = data.get("detail", "") or ""
state = map_detail_to_state(detail, fallback_state=state)
age = _state_age_seconds(data)
if age is not None and age > STALE_STATE_TTL_SECONDS:
state = "idle"
detail = f"本地/status 超过{STALE_STATE_TTL_SECONDS}s未更新,自动回待命"
if VERBOSE:
print(f"[status-source:http] url={LOCAL_STATUS_URL} state={state} detail={detail[:60]}")
return {"state": state, "detail": detail}
# 如果 401,说明需要 token
if r.status_code == 401:
return {"state": "idle", "detail": "本地/status需要鉴权(401),请设置 OFFICE_LOCAL_STATUS_TOKEN"}
except Exception:
pass
# 3) 默认 fallback
if VERBOSE:
print("[status-source:fallback] state=idle detail=待命中")
return {"state": "idle", "detail": "待命中"}
def do_join(local):
import requests
payload = {
"name": local.get("agentName", AGENT_NAME),
"joinKey": local.get("joinKey", JOIN_KEY),
"state": "idle",
"detail": "刚刚加入"
}
r = requests.post(f"{OFFICE_URL}{JOIN_ENDPOINT}", json=payload, timeout=10)
if r.status_code in (200, 201):
data = r.json()
if data.get("ok"):
local["joined"] = True
local["agentId"] = data.get("agentId")
save_local_state(local)
print(f"✅ 已加入海辛办公室,agentId={local['agentId']}")
return True
print(f"❌ 加入失败:{r.text}")
return False
def do_push(local, status_data):
import requests
payload = {
"agentId": local.get("agentId"),
"joinKey": local.get("joinKey", JOIN_KEY),
"state": status_data.get("state", "idle"),
"detail": status_data.get("detail", ""),
"name": local.get("agentName", AGENT_NAME)
}
r = requests.post(f"{OFFICE_URL}{PUSH_ENDPOINT}", json=payload, timeout=10)
if r.status_code in (200, 201):
data = r.json()
if data.get("ok"):
area = data.get("area", "breakroom")
print(f"✅ 状态已同步,当前区域={area}")
return True
# 403/404:拒绝/移除 → 停止推送
if r.status_code in (403, 404):
msg = ""
try:
msg = (r.json() or {}).get("msg", "")
except Exception:
msg = r.text
print(f"⚠️ 访问拒绝或已移出房间({r.status_code}),停止推送:{msg}")
local["joined"] = False
local["agentId"] = None
save_local_state(local)
sys.exit(1)
print(f"⚠️ 推送失败:{r.text}")
return False
def main():
local = load_local_state()
# Startup hint for state source and URL (helps with port/state issues, e.g. issue #31)
if LOCAL_STATE_FILE:
print(f"State file: {LOCAL_STATE_FILE}")
else:
first_existing = next((p for p in DEFAULT_STATE_CANDIDATES if p and os.path.exists(p)), None)
if first_existing:
print(f"State file (auto): {first_existing}")
else:
print("State file: auto-discover (set OFFICE_LOCAL_STATE_FILE if state not found)")
print(f"Local status URL: {LOCAL_STATUS_URL} (set OFFICE_LOCAL_STATUS_URL if backend uses another port)")
# 先确认配置是否齐全
if not JOIN_KEY or not AGENT_NAME:
print("❌ 请先在脚本开头填入 JOIN_KEY 和 AGENT_NAME")
sys.exit(1)
# 如果之前没 join,先 join
if not local.get("joined") or not local.get("agentId"):
ok = do_join(local)
if not ok:
sys.exit(1)
# 持续推送
print(f"🚀 开始持续推送状态,间隔={PUSH_INTERVAL_SECONDS}秒")
print("🧭 状态逻辑:任务中→工作区;待命/完成→休息区;异常→bug区")
print("🔐 若本地 /status 返回 Unauthorized(401),请设置环境变量:OFFICE_LOCAL_STATUS_TOKEN 或 OFFICE_LOCAL_STATUS_URL")
try:
while True:
try:
status_data = fetch_local_status()
do_push(local, status_data)
except Exception as e:
print(f"⚠️ 推送异常:{e}")
time.sleep(PUSH_INTERVAL_SECONDS)
except KeyboardInterrupt:
print("\n👋 停止推送")
sys.exit(0)
if __name__ == "__main__":
main()