Skip to content

Commit

Permalink
feat: Support the asyncio debug mode and apply aiomonitor-ng (#688)
Browse files Browse the repository at this point in the history
* feat: Add `[debug].{asyncio,enhanced-aiomonitor-task-info}` config flags
  in the manager, agent, and storage-proxy TOML files
* fix: Prevent overlapping of manager/agent aiomonitor ports by changing
  the default ports to have more distant values (50100, 50200).
* setup: Replace aiomonitor with aiomonitor-ng
* feat: Add aiomonitor support to storage-proxy (default port: 50300)

Backported-From: main
Backported-To: 22.03
  • Loading branch information
achimnol committed Aug 30, 2022
1 parent 3c82561 commit 3c45b9e
Show file tree
Hide file tree
Showing 13 changed files with 291 additions and 213 deletions.
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ python_requirements(
source="requirements.txt",
module_mapping={
"aiodataloader-ng": ["aiodataloader"],
"aiomonitor-ng": ["aiomonitor"],
"attrs": ["attr", "attrs"],
"aiohttp-session": ["aiohttp_session"],
"pycryptodome": ["Crypto"],
Expand Down
1 change: 1 addition & 0 deletions changes/688.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Enable the asyncio debug mode when our debug mode is enabled (e.g., `--debug`) and replace `aiomonitor` with `aiomonitor-ng`
9 changes: 9 additions & 0 deletions configs/agent/sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ scaling-group = "default"
# when the agent starts up. [default: false]
# skip-manager-detection = false

# The port number for aiomonitor.
# aiomonitor-port = 50200

# This directory is used to store the last registry file.
# var-base-path = "/var/lib/backend.ai"

Expand Down Expand Up @@ -228,6 +231,12 @@ enabled = false
# launches, due to bugs in initialization steps such as jail.
skip-container-deletion = false

# Enable or disable the asyncio debug mode.
asyncio = false

# Use the custom task factory to get more detailed asyncio task information; this may have performance penalties
enhanced-aiomonitor-task-info = false

# Include debug-level logs for internal events.
log-events = false

Expand Down
18 changes: 17 additions & 1 deletion configs/manager/sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ hide-agents = false
# uses an importer image from a private registry.
importer-image = "lablup/importer:manylinux2010"

# The maximum size of websocket messages in bytes.
# max-wsmsg-size = 16777216 # 16 MiB

# The starting port number for aiomonitor.
# Since the manager is composed of multiple processes, each process will be exposed
# via the port number of this base port number + pidx.
# aiomonitor-port = 50100


[docker-registry]
# Enable or disable SSL certificate verification when accessing Docker registries.
Expand Down Expand Up @@ -160,4 +168,12 @@ ssl-verify = true

[debug]
enabled = false
periodic-sync-stats = false # periodically sync container stat from Redis to the kernels.last_stat column.

# Enable or disable the asyncio debug mode.
asyncio = false

# Use the custom task factory to get more detailed asyncio task information; this may have performance penalties
enhanced-aiomonitor-task-info = false

# Periodically sync container stat from Redis to the kernels.last_stat column.
periodic-sync-stats = false
11 changes: 11 additions & 0 deletions configs/storage-proxy/sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ session-expire = "1d"
# user = 1000
# group = 1000

# The starting port number for aiomonitor.
# Since the storage-proxy is composed of multiple processes, each process will be exposed
# via the port number of this base port number + pidx.
# aiomonitor-port = 50300


[api.client]
# Client-facing API
Expand All @@ -57,6 +62,12 @@ secret = "some-secret-shared-with-manager"
# Enable the debug mode by overriding the global loglevel and "ai.backend" loglevel.
enabled = false

# Enable or disable the asyncio debug mode.
asyncio = false

# Use the custom task factory to get more detailed asyncio task information; this may have performance penalties
enhanced-aiomonitor-task-info = false


[logging]
# One of: "NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"
Expand Down
289 changes: 153 additions & 136 deletions python.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ aiohttp_cors~=0.7
aiohttp_sse>=2.0
aiohttp_session[aioredis]~=2.11
aiodns>=3.0
aiomonitor~=0.4.5
aiomonitor-ng~=0.5.1
aioredis[hiredis]~=2.0.1
aiosqlite~=0.17.0
aiotools~=1.5.9
Expand Down
4 changes: 3 additions & 1 deletion src/ai/backend/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
),
t.Key("event-loop", default="asyncio"): t.Enum("asyncio", "uvloop"),
t.Key("skip-manager-detection", default=False): t.ToBool,
t.Key("aiomonitor-port", default=50102): t.Int[1:65535],
t.Key("aiomonitor-port", default=50200): t.Int[1:65535],
}
).allow_extra("*"),
t.Key("container"): t.Dict(
Expand Down Expand Up @@ -74,6 +74,8 @@
t.Key("debug"): t.Dict(
{
t.Key("enabled", default=False): t.Bool,
t.Key("asyncio", default=False): t.Bool,
t.Key("enhanced-aiomonitor-task-info", default=False): t.Bool,
t.Key("skip-container-deletion", default=False): t.Bool,
t.Key("log-stats", default=False): t.Bool,
t.Key("log-kernel-config", default=False): t.Bool,
Expand Down
4 changes: 3 additions & 1 deletion src/ai/backend/agent/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,11 +672,13 @@ async def server_main(
local_config["plugins"] = await etcd.get_prefix_dict("config/plugins/accelerator")

# Start aiomonitor.
# Port is set by config (default=50002).
# Port is set by config (default=50200).
loop.set_debug(local_config["debug"]["asyncio"])
monitor = aiomonitor.Monitor(
loop,
port=local_config["agent"]["aiomonitor-port"],
console_enabled=False,
hook_task_factory=local_config["debug"]["enhanced-aiomonitor-task-info"],
)
monitor.prompt = "monitor (agent) >>> "
monitor.start()
Expand Down
4 changes: 3 additions & 1 deletion src/ai/backend/manager/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@
t.Key("hide-agents", default=False): t.Bool,
t.Key("importer-image", default="lablup/importer:manylinux2010"): t.String,
t.Key("max-wsmsg-size", default=16 * (2**20)): t.ToInt, # default: 16 MiB
t.Key("aiomonitor-port", default=50001): t.Int[1:65535],
t.Key("aiomonitor-port", default=50100): t.Int[1:65535],
}
).allow_extra("*"),
t.Key("docker-registry"): t.Dict(
Expand All @@ -267,6 +267,8 @@
t.Key("debug"): t.Dict(
{
t.Key("enabled", default=False): t.ToBool,
t.Key("asyncio", default=False): t.Bool,
t.Key("enhanced-aiomonitor-task-info", default=False): t.Bool,
t.Key("log-events", default=False): t.ToBool,
t.Key("log-scheduler-ticks", default=False): t.ToBool,
t.Key("periodic-sync-stats", default=False): t.ToBool,
Expand Down
4 changes: 3 additions & 1 deletion src/ai/backend/manager/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,11 +645,13 @@ async def server_main(
root_ctx: RootContext = root_app["_root.context"]

# Start aiomonitor.
# Port is set by config (default=50001).
# Port is set by config (default=50100 + pidx).
loop.set_debug(root_ctx.local_config["debug"]["asyncio"])
m = aiomonitor.Monitor(
loop,
port=root_ctx.local_config["manager"]["aiomonitor-port"] + pidx,
console_enabled=False,
hook_task_factory=root_ctx.local_config["debug"]["enhanced-aiomonitor-task-info"],
)
m.prompt = f"monitor (manager[{pidx}@{os.getpid()}]) >>> "
m.start()
Expand Down
3 changes: 3 additions & 0 deletions src/ai/backend/storage/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
t.Key("group", default=None): tx.GroupID(
default_gid=_file_perm.st_gid,
),
t.Key("aiomonitor-port", default=50300): t.Int[1:65535],
},
),
t.Key("logging"): logging_config_iv,
Expand Down Expand Up @@ -71,6 +72,8 @@
t.Key("debug"): t.Dict(
{
t.Key("enabled", default=False): t.ToBool,
t.Key("asyncio", default=False): t.Bool,
t.Key("enhanced-aiomonitor-task-info", default=False): t.Bool,
},
).allow_extra("*"),
},
Expand Down
154 changes: 83 additions & 71 deletions src/ai/backend/storage/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import pwd
import ssl
import sys
from contextlib import closing
from pathlib import Path
from pprint import pformat, pprint
from typing import Any, AsyncIterator, Sequence

import aiomonitor
import aiotools
import click
from aiohttp import web
Expand Down Expand Up @@ -50,81 +52,91 @@ async def server_main(
_args: Sequence[Any],
) -> AsyncIterator[None]:
local_config = _args[0]

etcd_credentials = None
if local_config["etcd"]["user"]:
etcd_credentials = {
"user": local_config["etcd"]["user"],
"password": local_config["etcd"]["password"],
}
scope_prefix_map = {
ConfigScopes.GLOBAL: "",
ConfigScopes.NODE: f"nodes/storage/{local_config['storage-proxy']['node-id']}",
}
etcd = AsyncEtcd(
local_config["etcd"]["addr"],
local_config["etcd"]["namespace"],
scope_prefix_map,
credentials=etcd_credentials,
loop.set_debug(local_config["debug"]["asyncio"])
m = aiomonitor.Monitor(
loop,
port=local_config["storage-proxy"]["aiomonitor-port"] + pidx,
console_enabled=False,
hook_task_factory=local_config["debug"]["enhanced-aiomonitor-task-info"],
)
ctx = Context(pid=os.getpid(), local_config=local_config, etcd=etcd)
client_api_app = await init_client_app(ctx)
manager_api_app = await init_manager_app(ctx)

client_ssl_ctx = None
manager_ssl_ctx = None
if local_config["api"]["client"]["ssl-enabled"]:
client_ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
client_ssl_ctx.load_cert_chain(
str(local_config["api"]["client"]["ssl-cert"]),
str(local_config["api"]["client"]["ssl-privkey"]),
m.prompt = f"monitor (storage-proxy[{pidx}@{os.getpid()}]) >>> "
m.start()

with closing(m):
etcd_credentials = None
if local_config["etcd"]["user"]:
etcd_credentials = {
"user": local_config["etcd"]["user"],
"password": local_config["etcd"]["password"],
}
scope_prefix_map = {
ConfigScopes.GLOBAL: "",
ConfigScopes.NODE: f"nodes/storage/{local_config['storage-proxy']['node-id']}",
}
etcd = AsyncEtcd(
local_config["etcd"]["addr"],
local_config["etcd"]["namespace"],
scope_prefix_map,
credentials=etcd_credentials,
)
if local_config["api"]["manager"]["ssl-enabled"]:
manager_ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
manager_ssl_ctx.load_cert_chain(
str(local_config["api"]["manager"]["ssl-cert"]),
str(local_config["api"]["manager"]["ssl-privkey"]),
ctx = Context(pid=os.getpid(), local_config=local_config, etcd=etcd)
client_api_app = await init_client_app(ctx)
manager_api_app = await init_manager_app(ctx)

client_ssl_ctx = None
manager_ssl_ctx = None
if local_config["api"]["client"]["ssl-enabled"]:
client_ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
client_ssl_ctx.load_cert_chain(
str(local_config["api"]["client"]["ssl-cert"]),
str(local_config["api"]["client"]["ssl-privkey"]),
)
if local_config["api"]["manager"]["ssl-enabled"]:
manager_ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
manager_ssl_ctx.load_cert_chain(
str(local_config["api"]["manager"]["ssl-cert"]),
str(local_config["api"]["manager"]["ssl-privkey"]),
)
client_api_runner = web.AppRunner(client_api_app)
manager_api_runner = web.AppRunner(manager_api_app)
await client_api_runner.setup()
await manager_api_runner.setup()
client_service_addr = local_config["api"]["client"]["service-addr"]
manager_service_addr = local_config["api"]["manager"]["service-addr"]
client_api_site = web.TCPSite(
client_api_runner,
str(client_service_addr.host),
client_service_addr.port,
backlog=1024,
reuse_port=True,
ssl_context=client_ssl_ctx,
)
client_api_runner = web.AppRunner(client_api_app)
manager_api_runner = web.AppRunner(manager_api_app)
await client_api_runner.setup()
await manager_api_runner.setup()
client_service_addr = local_config["api"]["client"]["service-addr"]
manager_service_addr = local_config["api"]["manager"]["service-addr"]
client_api_site = web.TCPSite(
client_api_runner,
str(client_service_addr.host),
client_service_addr.port,
backlog=1024,
reuse_port=True,
ssl_context=client_ssl_ctx,
)
manager_api_site = web.TCPSite(
manager_api_runner,
str(manager_service_addr.host),
manager_service_addr.port,
backlog=1024,
reuse_port=True,
ssl_context=manager_ssl_ctx,
)
await client_api_site.start()
await manager_api_site.start()
if os.geteuid() == 0:
uid = local_config["storage-proxy"]["user"]
gid = local_config["storage-proxy"]["group"]
os.setgroups(
[g.gr_gid for g in grp.getgrall() if pwd.getpwuid(uid).pw_name in g.gr_mem],
manager_api_site = web.TCPSite(
manager_api_runner,
str(manager_service_addr.host),
manager_service_addr.port,
backlog=1024,
reuse_port=True,
ssl_context=manager_ssl_ctx,
)
os.setgid(gid)
os.setuid(uid)
log.info("Changed process uid:gid to {}:{}", uid, gid)
log.info("Started service.")
try:
yield
finally:
log.info("Shutting down...")
await manager_api_runner.cleanup()
await client_api_runner.cleanup()
await client_api_site.start()
await manager_api_site.start()
if os.geteuid() == 0:
uid = local_config["storage-proxy"]["user"]
gid = local_config["storage-proxy"]["group"]
os.setgroups(
[g.gr_gid for g in grp.getgrall() if pwd.getpwuid(uid).pw_name in g.gr_mem],
)
os.setgid(gid)
os.setuid(uid)
log.info("Changed process uid:gid to {}:{}", uid, gid)
log.info("Started service.")
try:
yield
finally:
log.info("Shutting down...")
await manager_api_runner.cleanup()
await client_api_runner.cleanup()


@click.group(invoke_without_command=True)
Expand Down

0 comments on commit 3c45b9e

Please sign in to comment.