Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance link down handler to avoid race conditions in the failover path #438

Merged
merged 7 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions controllers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,37 @@ def update_evc(self, evc: Dict) -> Optional[Dict]:
)
return updated

def update_evcs(self, circuit_ids: list, metadata: dict, action: str):
"""Update a bulk of EVC"""
def update_evcs(self, evcs: list[dict]) -> int:
"""Update EVCs and return the number of modified documents."""
if not evcs:
return 0

ops = []
utc_now = datetime.utcnow()

for evc in evcs:
evc["updated_at"] = utc_now
model = EVCBaseDoc(
**{
**evc,
**{"_id": evc["id"]}
}
).dict(exclude_none=True)
ops.append(
UpdateOne(
{"_id": evc["id"]},
{
"$set": model,
"$setOnInsert": {"inserted_at": utc_now}
},
)
)
return self.db.evcs.bulk_write(ops).modified_count

def update_evcs_metadata(
self, circuit_ids: list, metadata: dict, action: str
):
"""Bulk update EVCs metadata."""
utc_now = datetime.utcnow()
metadata = {f"metadata.{k}": v for k, v in metadata.items()}
if action == "add":
Expand Down
133 changes: 80 additions & 53 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def execute_consistency(self):
stored_circuits.pop(circuit.id, None)
if self.should_be_checked(circuit):
circuits_to_check.append(circuit)
circuit.try_setup_failover_path()
circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
for circuit in circuits_to_check:
is_checked = circuits_checked.get(circuit.id)
Expand Down Expand Up @@ -470,7 +471,7 @@ def bulk_add_metadata(self, request: Request) -> JSONResponse:
data = get_json_or_400(request, self.controller.loop)
circuit_ids = data.pop("circuit_ids")

self.mongo_controller.update_evcs(circuit_ids, data, "add")
self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")

fail_evcs = []
for _id in circuit_ids:
Expand Down Expand Up @@ -511,7 +512,9 @@ def bulk_delete_metadata(self, request: Request) -> JSONResponse:
data = get_json_or_400(request, self.controller.loop)
key = request.path_params["key"]
circuit_ids = data.pop("circuit_ids")
self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
self.mongo_controller.update_evcs_metadata(
circuit_ids, {key: ""}, "del"
)

fail_evcs = []
for _id in circuit_ids:
Expand Down Expand Up @@ -800,14 +803,14 @@ def handle_interface_link_down(self, interface):
"""
Handler for interface link_down events
"""
log.info("Event handle_interface_link_down %s", interface)
for evc in self.get_evcs_by_svc_level():
with evc.lock:
log.info("Event handle_interface_link_down %s", interface)
viniarck marked this conversation as resolved.
Show resolved Hide resolved
evc.handle_interface_link_down(
interface
)

@listen_to("kytos/topology.link_down")
@listen_to("kytos/topology.link_down", pool="dynamic_single")
def on_link_down(self, event):
"""Change circuit when link is down or under_mantenance."""
self.handle_link_down(event)
Expand All @@ -821,31 +824,39 @@ def handle_link_down(self, event): # pylint: disable=too-many-branches
evcs_normal = []
check_failover = []
for evc in self.get_evcs_by_svc_level():
if evc.is_affected_by_link(link):
# if there is no failover path, handles link down the
# tradditional way
if (
not getattr(evc, 'failover_path', None) or
evc.is_failover_path_affected_by_link(link)
):
evcs_normal.append(evc)
continue
try:
dpid_flows = evc.get_failover_flows()
# pylint: disable=broad-except
except Exception:
err = traceback.format_exc().replace("\n", ", ")
log.error(
f"Ignore Failover path for {evc} due to error: {err}"
)
evcs_normal.append(evc)
continue
for dpid, flows in dpid_flows.items():
switch_flows.setdefault(dpid, [])
switch_flows[dpid].extend(flows)
evcs_with_failover.append(evc)
else:
check_failover.append(evc)
with evc.lock:
if evc.is_affected_by_link(link):
evc.affected_by_link_at = event.timestamp
# if there is no failover path, handles link down the
# tradditional way
if (
not evc.failover_path or
evc.is_failover_path_affected_by_link(link)
):
evcs_normal.append(evc)
continue
try:
dpid_flows = evc.get_failover_flows()
evc.old_path = evc.current_path
evc.current_path = evc.failover_path
evc.failover_path = Path([])
# pylint: disable=broad-except
except Exception:
err = traceback.format_exc().replace("\n", ", ")
log.error(
"Ignore Failover path for "
f"{evc} due to error: {err}"
)
evcs_normal.append(evc)
continue
for dpid, flows in dpid_flows.items():
switch_flows.setdefault(dpid, [])
switch_flows[dpid].extend(flows)
italovalcy marked this conversation as resolved.
Show resolved Hide resolved
evcs_with_failover.append(evc)
elif evc.is_failover_path_affected_by_link(link):
evc.old_path = evc.failover_path
evc.failover_path = Path([])
check_failover.append(evc)

while switch_flows:
offset = settings.BATCH_SIZE or None
Expand All @@ -866,33 +877,34 @@ def handle_link_down(self, event): # pylint: disable=too-many-branches
switch_flows[dpid] = switch_flows[dpid][offset:]
time.sleep(settings.BATCH_INTERVAL)

for evc in evcs_with_failover:
with evc.lock:
old_path = evc.current_path
evc.current_path = evc.failover_path
evc.failover_path = old_path
evc.sync()
emit_event(self.controller, "redeployed_link_down",
content=map_evc_event_content(evc))
log.info(
f"{evc} redeployed with failover due to link down {link.id}"
)

for evc in evcs_normal:
emit_event(
self.controller,
"evc_affected_by_link_down",
content={"link_id": link.id} | map_evc_event_content(evc)
content={"link": link} | map_evc_event_content(evc)
)

# After handling the hot path, check if new failover paths are needed.
# Note that EVCs affected by link down will generate a KytosEvent for
# deployed|redeployed, which will trigger the failover path setup.
# Thus, we just need to further check the check_failover list
evcs_to_update = []
for evc in evcs_with_failover:
evcs_to_update.append(evc.as_dict())
emit_event(
self.controller,
"redeployed_link_down",
content=map_evc_event_content(evc)
)
log.info(
f"{evc} redeployed with failover due to link down {link.id}"
)
for evc in check_failover:
if evc.is_failover_path_affected_by_link(link):
with evc.lock:
evc.setup_failover_path()
evcs_to_update.append(evc.as_dict())

self.mongo_controller.update_evcs(evcs_to_update)

emit_event(
self.controller,
"cleanup_evcs_old_path",
content={"evcs": evcs_with_failover + check_failover}
)

@listen_to("kytos/mef_eline.evc_affected_by_link_down")
def on_evc_affected_by_link_down(self, event):
Expand All @@ -902,14 +914,16 @@ def on_evc_affected_by_link_down(self, event):
def handle_evc_affected_by_link_down(self, event):
"""Change circuit when link is down or under_mantenance."""
evc = self.circuits.get(event.content["evc_id"])
link_id = event.content['link_id']
link = event.content['link']
if not evc:
return
with evc.lock:
if not evc.is_affected_by_link(link):
return
result = evc.handle_link_down()
event_name = "error_redeploy_link_down"
if result:
log.info(f"{evc} redeployed due to link down {link_id}")
log.info(f"{evc} redeployed due to link down {link.id}")
event_name = "redeployed_link_down"
emit_event(self.controller, event_name,
content=map_evc_event_content(evc))
Expand All @@ -924,8 +938,21 @@ def handle_evc_deployed(self, event):
evc = self.circuits.get(event.content["evc_id"])
if not evc:
return
with evc.lock:
evc.setup_failover_path()
evc.try_setup_failover_path()

@listen_to("kytos/mef_eline.cleanup_evcs_old_path")
def on_cleanup_evcs_old_path(self, event):
"""Handle cleanup evcs old path."""
self.handle_cleanup_evcs_old_path(event)

def handle_cleanup_evcs_old_path(self, event):
"""Handle cleanup evcs old path."""
evcs = event.content.get("evcs", [])
for evc in evcs:
if not evc.old_path:
continue
evc.remove_path_flows(evc.old_path)
viniarck marked this conversation as resolved.
Show resolved Hide resolved
evc.old_path = Path([])

@listen_to("kytos/topology.topology_loaded")
def on_topology_loaded(self, event): # pylint: disable=unused-argument
Expand Down
10 changes: 10 additions & 0 deletions models/evc.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ def __init__(self, controller, **kwargs):
self.current_links_cache = set()
self.primary_links_cache = set()
self.backup_links_cache = set()
self.affected_by_link_at = get_time("0001-01-01T00:00:00")
self.old_path = Path([])

self.lock = Lock()

Expand Down Expand Up @@ -872,6 +874,14 @@ def deploy_to_path(self, path=None): # pylint: disable=too-many-branches
log.info(f"{self} was deployed.")
return True

def try_setup_failover_path(self, wait=settings.DEPLOY_EVCS_INTERVAL):
"""Try setup failover_path whenever possible."""
if self.failover_path:
return
if (now() - self.affected_by_link_at).seconds >= wait:
with self.lock:
self.setup_failover_path()

def setup_failover_path(self):
"""Install flows for the failover path of this EVC.

Expand Down
14 changes: 11 additions & 3 deletions tests/unit/test_controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,19 @@ def test_upsert_evc(self):
self.eline.upsert_evc(self.evc_dict)
assert self.eline.db.evcs.find_one_and_update.call_count == 1

def test_update_evcs(self):
"""Test update_evcs"""
def test_update_evcs_metadata(self):
"""Test update_evcs_metadata"""
circuit_ids = ["123", "456", "789"]
metadata = {"info": "testing"}
self.eline.update_evcs(circuit_ids, metadata, "add")
self.eline.update_evcs_metadata(circuit_ids, metadata, "add")
arg = self.eline.db.evcs.bulk_write.call_args[0][0]
assert len(arg) == 3
assert self.eline.db.evcs.bulk_write.call_count == 1

def test_update_evcs(self):
"""Test update_evcs"""
evc2 = dict(self.evc_dict | {"id": "456"})
self.eline.update_evcs([self.evc_dict, evc2])
arg = self.eline.db.evcs.bulk_write.call_args[0][0]
assert len(arg) == 2
assert self.eline.db.evcs.bulk_write.call_count == 1
Loading
Loading