Skip to content

speedy startup #649

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/warnet/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@
},
}

LOGGING_CRD_COMMANDS = [
"helm repo add prometheus-community https://prometheus-community.github.io/helm-charts",
"helm repo update",
"helm upgrade --install prometheus-operator-crds prometheus-community/prometheus-operator-crds",
]

# Helm commands for logging setup
# TODO: also lots of hardcode stuff in these helm commands, will need to fix this when moving to helm charts
LOGGING_HELM_COMMANDS = [
Expand Down
17 changes: 13 additions & 4 deletions src/warnet/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
import zipapp
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import Pool
from pathlib import Path
from typing import Optional

Expand Down Expand Up @@ -112,10 +113,18 @@ def stop_scenario(scenario_name):


def stop_all_scenarios(scenarios):
"""Stop all active scenarios using Helm"""
with console.status("[bold yellow]Stopping all scenarios...[/bold yellow]"):
for scenario in scenarios:
stop_scenario(scenario)
"""Stop all active scenarios in parallel using multiprocessing"""

def stop_single(scenario):
stop_scenario(scenario)
return f"Stopped scenario: {scenario}"

with console.status("[bold yellow]Stopping all scenarios...[/bold yellow]"), Pool() as pool:
results = pool.map(stop_single, scenarios)

for result in results:
console.print(f"[bold green]{result}[/bold green]")

console.print("[bold green]All scenarios have been stopped.[/bold green]")


Expand Down
199 changes: 137 additions & 62 deletions src/warnet/deploy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import subprocess
import sys
import tempfile
from multiprocessing import Process
from pathlib import Path
from typing import Optional

Expand All @@ -15,6 +16,7 @@
FORK_OBSERVER_CHART,
HELM_COMMAND,
INGRESS_HELM_COMMANDS,
LOGGING_CRD_COMMANDS,
LOGGING_HELM_COMMANDS,
LOGGING_NAMESPACE,
NAMESPACES_CHART_LOCATION,
Expand Down Expand Up @@ -75,17 +77,47 @@ def _deploy(directory, debug, namespace, to_all_users):

if to_all_users:
namespaces = get_namespaces_by_type(WARGAMES_NAMESPACE_PREFIX)
processes = []
for namespace in namespaces:
deploy(directory, debug, namespace.metadata.name, False)
p = Process(target=deploy, args=(directory, debug, namespace.metadata.name, False))
p.start()
processes.append(p)
for p in processes:
p.join()
return

if (directory / NETWORK_FILE).exists():
dl = deploy_logging_stack(directory, debug)
deploy_network(directory, debug, namespace=namespace)
df = deploy_fork_observer(directory, debug)
if dl | df:
deploy_ingress(debug)
deploy_caddy(directory, debug)
processes = []
# Deploy logging CRD first to avoid synchronisation issues
deploy_logging_crd(directory, debug)

logging_process = Process(target=deploy_logging_stack, args=(directory, debug))
logging_process.start()
processes.append(logging_process)

network_process = Process(target=deploy_network, args=(directory, debug, namespace))
network_process.start()

ingress_process = Process(target=deploy_ingress, args=(directory, debug))
ingress_process.start()
processes.append(ingress_process)

caddy_process = Process(target=deploy_caddy, args=(directory, debug))
caddy_process.start()
processes.append(caddy_process)

# Wait for the network process to complete
network_process.join()

# Start the fork observer process immediately after network process completes
fork_observer_process = Process(target=deploy_fork_observer, args=(directory, debug))
fork_observer_process.start()
processes.append(fork_observer_process)

# Wait for all other processes to complete
for p in processes:
p.join()

elif (directory / NAMESPACES_FILE).exists():
deploy_namespaces(directory)
else:
Expand Down Expand Up @@ -118,11 +150,30 @@ def check_logging_required(directory: Path):
return False


def deploy_logging_crd(directory: Path, debug: bool) -> bool:
"""
This function exists so we can parallelise the rest of the loggin stack
installation
"""
if not check_logging_required(directory):
return False

click.echo(
"Found collectLogs or metricsExport in network definition, Deploying logging stack CRD"
)

for command in LOGGING_CRD_COMMANDS:
if not stream_command(command):
print(f"Failed to run Helm command: {command}")
return False
return True


def deploy_logging_stack(directory: Path, debug: bool) -> bool:
if not check_logging_required(directory):
return False

click.echo("Found collectLogs or metricsExport in network definition, Deploying logging stack")
click.echo("Deploying logging stack")

for command in LOGGING_HELM_COMMANDS:
if not stream_command(command):
Expand All @@ -144,7 +195,7 @@ def deploy_caddy(directory: Path, debug: bool):
if not network_file.get(name, {}).get("enabled", False):
return

cmd = f"{HELM_COMMAND} {name} {CADDY_CHART} --namespace {namespace}"
cmd = f"{HELM_COMMAND} {name} {CADDY_CHART} --namespace {namespace} --create-namespace"
if debug:
cmd += " --debug"

Expand All @@ -156,7 +207,15 @@ def deploy_caddy(directory: Path, debug: bool):
click.echo("\nTo access the warnet dashboard run:\n warnet dashboard")


def deploy_ingress(debug: bool):
def deploy_ingress(directory: Path, debug: bool):
# Deploy ingress if either logging or fork observer is enabled
network_file_path = directory / NETWORK_FILE
with network_file_path.open() as f:
network_file = yaml.safe_load(f)
fo_enabled = network_file.get("fork_observer", {}).get("enabled", False)
logging_enabled = check_logging_required(directory)
if not (fo_enabled or logging_enabled):
return
click.echo("Deploying ingress controller")

for command in INGRESS_HELM_COMMANDS:
Expand Down Expand Up @@ -231,41 +290,49 @@ def deploy_fork_observer(directory: Path, debug: bool) -> bool:

def deploy_network(directory: Path, debug: bool = False, namespace: Optional[str] = None):
network_file_path = directory / NETWORK_FILE
defaults_file_path = directory / DEFAULTS_FILE

namespace = get_default_namespace_or(namespace)

with network_file_path.open() as f:
network_file = yaml.safe_load(f)

processes = []
for node in network_file["nodes"]:
click.echo(f"Deploying node: {node.get('name')}")
try:
temp_override_file_path = ""
node_name = node.get("name")
node_config_override = {k: v for k, v in node.items() if k != "name"}

cmd = f"{HELM_COMMAND} {node_name} {BITCOIN_CHART_LOCATION} --namespace {namespace} -f {defaults_file_path}"
if debug:
cmd += " --debug"

if node_config_override:
with tempfile.NamedTemporaryFile(
mode="w", suffix=".yaml", delete=False
) as temp_file:
yaml.dump(node_config_override, temp_file)
temp_override_file_path = Path(temp_file.name)
cmd = f"{cmd} -f {temp_override_file_path}"

if not stream_command(cmd):
click.echo(f"Failed to run Helm command: {cmd}")
return
except Exception as e:
click.echo(f"Error: {e}")
p = Process(target=deploy_single_node, args=(node, directory, debug, namespace))
p.start()
processes.append(p)

for p in processes:
p.join()


def deploy_single_node(node, directory: Path, debug: bool, namespace: str):
defaults_file_path = directory / DEFAULTS_FILE
click.echo(f"Deploying node: {node.get('name')}")
temp_override_file_path = ""
try:
node_name = node.get("name")
node_config_override = {k: v for k, v in node.items() if k != "name"}

defaults_file_path = directory / DEFAULTS_FILE
cmd = f"{HELM_COMMAND} {node_name} {BITCOIN_CHART_LOCATION} --namespace {namespace} -f {defaults_file_path}"
if debug:
cmd += " --debug"

if node_config_override:
with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as temp_file:
yaml.dump(node_config_override, temp_file)
temp_override_file_path = Path(temp_file.name)
cmd = f"{cmd} -f {temp_override_file_path}"

if not stream_command(cmd):
click.echo(f"Failed to run Helm command: {cmd}")
return
finally:
if temp_override_file_path:
Path(temp_override_file_path).unlink()
except Exception as e:
click.echo(f"Error: {e}")
return
finally:
if temp_override_file_path:
Path(temp_override_file_path).unlink()


def deploy_namespaces(directory: Path):
Expand All @@ -284,32 +351,40 @@ def deploy_namespaces(directory: Path):
)
return

processes = []
for namespace in namespaces_file["namespaces"]:
click.echo(f"Deploying namespace: {namespace.get('name')}")
try:
temp_override_file_path = ""
namespace_name = namespace.get("name")
namespace_config_override = {k: v for k, v in namespace.items() if k != "name"}

cmd = f"{HELM_COMMAND} {namespace_name} {NAMESPACES_CHART_LOCATION} -f {defaults_file_path}"

if namespace_config_override:
with tempfile.NamedTemporaryFile(
mode="w", suffix=".yaml", delete=False
) as temp_file:
yaml.dump(namespace_config_override, temp_file)
temp_override_file_path = Path(temp_file.name)
cmd = f"{cmd} -f {temp_override_file_path}"

if not stream_command(cmd):
click.echo(f"Failed to run Helm command: {cmd}")
return
except Exception as e:
click.echo(f"Error: {e}")
p = Process(target=deploy_single_namespace, args=(namespace, defaults_file_path))
p.start()
processes.append(p)

for p in processes:
p.join()


def deploy_single_namespace(namespace, defaults_file_path: Path):
click.echo(f"Deploying namespace: {namespace.get('name')}")
temp_override_file_path = ""
try:
namespace_name = namespace.get("name")
namespace_config_override = {k: v for k, v in namespace.items() if k != "name"}

cmd = f"{HELM_COMMAND} {namespace_name} {NAMESPACES_CHART_LOCATION} -f {defaults_file_path}"

if namespace_config_override:
with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as temp_file:
yaml.dump(namespace_config_override, temp_file)
temp_override_file_path = Path(temp_file.name)
cmd = f"{cmd} -f {temp_override_file_path}"

if not stream_command(cmd):
click.echo(f"Failed to run Helm command: {cmd}")
return
finally:
if temp_override_file_path:
temp_override_file_path.unlink()
except Exception as e:
click.echo(f"Error: {e}")
return
finally:
if temp_override_file_path:
Path(temp_override_file_path).unlink()


def is_windows():
Expand Down
Loading