Skip to content

Commit 3b1d805

Browse files
authored
Test: Implement packet capture fixture using netsniff-ng tool. (#1283)
Remove tcpdump implementation as tcpdump does not provide correct results for compliance checks. Add EBU list pcap upload and compliance check support fixture. Update results by compliance result. Add test config options for packet capture and ebu list server info. Implement capturing packets in RxTxApp engine. Update test cases for RxTxApp with compliance check (with multicast destination address only) - ST20 and ST30 test cases
1 parent 7878416 commit 3b1d805

File tree

78 files changed

+531
-1097
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+531
-1097
lines changed
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import logging
2+
import os
3+
import time
4+
5+
import requests
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
class PcapComplianceClient:
11+
def __init__(
12+
self,
13+
ebu_ip,
14+
user,
15+
password,
16+
pcap_file=None,
17+
pcap_id=None,
18+
proxies={"http": "", "https": "", "ftp": ""},
19+
):
20+
"""
21+
Initialize the client.
22+
"""
23+
self.ebu_ip = ebu_ip
24+
self.user = user
25+
self.password = password
26+
self.pcap_file = pcap_file
27+
self.proxies = proxies
28+
self.pcap_id = pcap_id
29+
self.token = None
30+
self.session = requests.Session()
31+
self.session.trust_env = False # Do not use system proxy settings
32+
self.authenticate()
33+
34+
def authenticate(self):
35+
"""
36+
Authenticate with the EBU server and store the access token.
37+
"""
38+
url = f"http://{self.ebu_ip}/auth/login"
39+
headers = {"Content-Type": "application/json"}
40+
data = {"username": self.user, "password": self.password}
41+
response = self.session.post(
42+
url, headers=headers, json=data, verify=False, proxies=self.proxies
43+
)
44+
response.raise_for_status()
45+
self.token = response.json().get("content", {}).get("token")
46+
if not self.token:
47+
raise Exception("Authentication failed: No token received.")
48+
49+
def upload_pcap(self):
50+
"""
51+
Upload the PCAP file to the EBU server and store the returned UUID.
52+
Returns the UUID of the uploaded PCAP.
53+
"""
54+
url = f"http://{self.ebu_ip}/api/pcap"
55+
headers = {"Authorization": f"Bearer {self.token}"}
56+
if self.pcap_file:
57+
with open(self.pcap_file, "rb") as f:
58+
files = {
59+
"pcap": (
60+
os.path.basename(self.pcap_file),
61+
f,
62+
"application/vnd.tcpdump.pcap",
63+
)
64+
}
65+
response = self.session.put(
66+
url,
67+
headers=headers,
68+
files=files,
69+
verify=False,
70+
proxies=self.proxies,
71+
)
72+
response.raise_for_status()
73+
self.pcap_id = response.json().get("uuid")
74+
if not self.pcap_id:
75+
raise Exception("Upload failed: No UUID received.")
76+
return self.pcap_id
77+
78+
def download_report(self, retries=3):
79+
"""
80+
Download the compliance report for the uploaded PCAP file.
81+
returns the report as a JSON object.
82+
"""
83+
url = f"http://{self.ebu_ip}/api/pcap/{self.pcap_id}/report?type=json"
84+
headers = {"Authorization": f"Bearer {self.token}"}
85+
while retries > 0:
86+
response = self.session.get(
87+
url, headers=headers, verify=False, proxies=self.proxies
88+
)
89+
response.raise_for_status()
90+
report = response.json()
91+
if report.get("analyzed", False):
92+
return report
93+
time.sleep(1)
94+
retries -= 1
95+
logger.error(
96+
f"Report is not ready after {retries} seconds, skipping compliance check"
97+
)
98+
return False
99+
100+
def check_compliance(self, report=None):
101+
"""
102+
Check the compliance result from the downloaded report.
103+
Returns True if compliant, False otherwise.
104+
"""
105+
if report is None:
106+
report = self.download_report()
107+
is_compliant = report.get("not_compliant_streams", 1) == 0
108+
if is_compliant:
109+
is_compliant = not any(
110+
[
111+
1 if stream.get("media_type") == "unknown" else 0
112+
for stream in report.get("streams", [])
113+
]
114+
)
115+
return is_compliant, report
116+
117+
def delete_pcap(self, pcap_id=None):
118+
"""
119+
Delete the PCAP file and its report from the EBU server.
120+
If pcap_id is not provided, uses self.pcap_id.
121+
"""
122+
if pcap_id is None:
123+
pcap_id = self.pcap_id
124+
if not pcap_id:
125+
raise ValueError("No PCAP ID provided for deletion.")
126+
url = f"http://{self.ebu_ip}/api/pcap/{pcap_id}"
127+
headers = {"Authorization": f"Bearer {self.token}"}
128+
response = self.session.delete(
129+
url, headers=headers, verify=False, proxies=self.proxies
130+
)
131+
if response.status_code == 200:
132+
logger.info(f"PCAP {pcap_id} deleted successfully from EBU server.")
133+
return True
134+
else:
135+
logger.error(
136+
f"Failed to delete PCAP {pcap_id}: {response.status_code} {response.text}"
137+
)
138+
return False
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import argparse
2+
import os
3+
4+
from compliance_client import PcapComplianceClient
5+
6+
7+
def parse_args():
8+
parser = argparse.ArgumentParser(
9+
description="Upload a PCAP file to the EBU LIST server."
10+
)
11+
parser.add_argument(
12+
"--pcap",
13+
type=str,
14+
required=True,
15+
help="Path to the PCAP file to upload.",
16+
)
17+
parser.add_argument(
18+
"--ip",
19+
type=str,
20+
required=True,
21+
help="EBU LIST server IP address.",
22+
)
23+
parser.add_argument(
24+
"--user",
25+
type=str,
26+
required=True,
27+
help="Username for EBU LIST service.",
28+
)
29+
parser.add_argument(
30+
"--password",
31+
type=str,
32+
required=True,
33+
help="Password for EBU LIST service.",
34+
)
35+
parser.add_argument(
36+
"--proxy",
37+
type=str,
38+
required=False,
39+
help="Proxy for uploading to EBU LIST service.",
40+
)
41+
return parser.parse_args()
42+
43+
44+
def upload_pcap(file_path, ip, login, password, proxies):
45+
# Check for login and password
46+
if not ip or not login or not password:
47+
raise Exception("IP address, login and password are required.")
48+
49+
# Check if the file exists before proceeding
50+
if not os.path.isfile(file_path):
51+
raise Exception(f"File not found: {file_path}")
52+
53+
# Create the uploader object and upload the PCAP file
54+
uploader = PcapComplianceClient(
55+
ebu_ip=ip, user=login, password=password, pcap_file=file_path, proxies=proxies
56+
)
57+
uuid = uploader.upload_pcap() # Upload the PCAP file and get the UUID
58+
return uuid
59+
60+
61+
if __name__ == "__main__":
62+
args = parse_args()
63+
# TODO: Handle proxies better, so each type can have a proper value
64+
proxies = None
65+
if args.proxy is not None:
66+
proxies = {"http": args.proxy, "https": args.proxy, "ftp": args.proxy}
67+
uuid = upload_pcap(args.pcap, args.ip, args.user, args.password, proxies)
68+
# Print extractable UUID
69+
print(f">>>UUID: {uuid}")
Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
build: MTL_PATH_PLACEHOLDER
22
mtl_path: MTL_PATH_PLACEHOLDER
33
media_path: /mnt/media
4-
capture_cfg:
5-
enable: false
6-
test_name: test_name
7-
pcap_dir: /mnt/ramdisk/pcap
8-
capture_time: 5
9-
interface: null
104
ramdisk:
115
media:
126
mountpoint: /mnt/ramdisk/media
137
size_gib: 32
148
pcap:
159
mountpoint: /mnt/ramdisk/pcap
1610
size_gib: 768
11+
compliance: false
12+
capture_cfg:
13+
enable: false
14+
pcap_dir: /mnt/ramdisk/pcap
15+
ebu_server:
16+
ebu_ip: EBU_IP_PLACEHOLDER
17+
user: EBU_LOGIN_PLACEHOLDER
18+
password: EBU_PASS_PLACEHOLDER
19+
proxy: false

tests/validation/conftest.py

Lines changed: 90 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,19 @@
1212
import pytest
1313
from common.mtl_manager.mtlManager import MtlManager
1414
from common.nicctl import Nicctl
15+
from compliance.compliance_client import PcapComplianceClient
16+
from create_pcap_file.netsniff import NetsniffRecorder, calculate_packets_per_frame
1517
from mfd_common_libs.custom_logger import add_logging_level
1618
from mfd_common_libs.log_levels import TEST_FAIL, TEST_INFO, TEST_PASS
1719
from mfd_connect.exceptions import ConnectionCalledProcessError
18-
from mtl_engine.const import LOG_FOLDER, TESTCMD_LVL
20+
from mtl_engine.const import FRAMES_CAPTURE, LOG_FOLDER, TESTCMD_LVL
1921
from mtl_engine.csv_report import (
2022
csv_add_test,
2123
csv_write_report,
24+
get_compliance_result,
2225
update_compliance_result,
2326
)
27+
from mtl_engine.execute import log_fail
2428
from mtl_engine.ramdisk import Ramdisk
2529
from mtl_engine.stash import (
2630
clear_issue,
@@ -249,23 +253,75 @@ def log_session():
249253
csv_write_report(f"{LOG_FOLDER}/latest/report.csv")
250254

251255

252-
@pytest.fixture(scope="session", autouse=True)
253-
def compliance_report(request, log_session, test_config):
254-
"""
255-
This function is used for compliance check and report.
256-
"""
257-
# TODO: Implement compliance check logic. When tcpdump pcap is enabled, at the end of the test session all pcaps
258-
# shall be send into EBU list.
259-
# Pcaps shall be stored in the ramdisk, and then moved to the compliance
260-
# folder or send into EBU list after each test finished and remove it from the ramdisk.
261-
# Compliance report generation logic goes here after yield. Or in another class / function but triggered here.
262-
# AFAIK names of pcaps contains test name so it can be matched with result of each test like in code below.
263-
yield
264-
if test_config.get("compliance", False):
265-
logging.info("Compliance mode enabled, updating compliance results")
266-
for item in request.session.items:
267-
test_case = item.nodeid
268-
update_compliance_result(test_case, "Fail")
256+
@pytest.fixture(scope="function")
257+
def pcap_capture(request, media_file, test_config, hosts, mtl_path):
258+
capture_cfg = test_config.get("capture_cfg", {})
259+
capturer = None
260+
if capture_cfg and capture_cfg.get("enable"):
261+
host = hosts["client"] if "client" in hosts else list(hosts.values())[0]
262+
media_file_info, _ = media_file
263+
test_name = request.node.name
264+
if "frames_number" not in capture_cfg and "capture_time" not in capture_cfg:
265+
capture_cfg["packets_number"] = (
266+
FRAMES_CAPTURE * calculate_packets_per_frame(media_file_info)
267+
)
268+
logger.info(
269+
f"Capture {capture_cfg['packets_number']} packets for {FRAMES_CAPTURE} frames"
270+
)
271+
elif "frames_number" in capture_cfg:
272+
capture_cfg["packets_number"] = capture_cfg[
273+
"frames_number"
274+
] * calculate_packets_per_frame(media_file_info)
275+
logger.info(
276+
f"Capture {capture_cfg['packets_number']} packets for {capture_cfg['frames_number']} frames"
277+
)
278+
capturer = NetsniffRecorder(
279+
host=host,
280+
test_name=test_name,
281+
pcap_dir=capture_cfg.get("pcap_dir", "/tmp"),
282+
interface=host.network_interfaces[0].name,
283+
silent=capture_cfg.get("silent", True),
284+
packets_capture=capture_cfg.get("packets_number", None),
285+
capture_time=capture_cfg.get("capture_time", None),
286+
)
287+
yield capturer
288+
if capturer and capturer.netsniff_process:
289+
ebu_server = test_config.get("ebu_server", {})
290+
if not ebu_server:
291+
logger.error("EBU server configuration not found in test_config.yaml")
292+
return
293+
ebu_ip = ebu_server.get("ebu_ip", None)
294+
ebu_login = ebu_server.get("user", None)
295+
ebu_passwd = ebu_server.get("password", None)
296+
ebu_proxy = ebu_server.get("proxy", None)
297+
proxy_cmd = f" --proxy {ebu_proxy}" if ebu_proxy else ""
298+
compliance_upl = capturer.host.connection.execute_command(
299+
"python3 ./tests/validation/compliance/upload_pcap.py"
300+
f" --ip {ebu_ip}"
301+
f" --user {ebu_login}"
302+
f" --password {ebu_passwd}"
303+
f" --pcap {capturer.pcap_file}{proxy_cmd}",
304+
cwd=f"{str(mtl_path)}",
305+
)
306+
if compliance_upl.return_code != 0:
307+
logger.error(f"PCAP upload failed: {compliance_upl.stderr}")
308+
return
309+
uuid = compliance_upl.stdout.split(">>>UUID: ")[1].strip()
310+
logger.debug(f"PCAP successfully uploaded to EBU LIST with UUID: {uuid}")
311+
uploader = PcapComplianceClient(
312+
ebu_ip=ebu_ip,
313+
user=ebu_login,
314+
password=ebu_passwd,
315+
pcap_id=uuid,
316+
proxies={"http": ebu_proxy, "https": ebu_proxy},
317+
)
318+
result, report = uploader.check_compliance()
319+
update_compliance_result(request.node.nodeid, "Pass" if result else "Fail")
320+
if result:
321+
logger.info("PCAP compliance check passed")
322+
else:
323+
log_fail("PCAP compliance check failed")
324+
logger.info(f"Compliance report: {report}")
269325

270326

271327
@pytest.fixture(scope="function", autouse=True)
@@ -287,20 +343,26 @@ def log_case(request, caplog: pytest.LogCaptureFixture):
287343
clear_issue()
288344
yield
289345
report = request.node.stash[phase_report_key]
290-
if report["setup"].failed:
291-
logging.log(level=TEST_FAIL, msg=f"Setup failed for {case_id}")
346+
347+
def fail_test(stage):
348+
logger.log(level=TEST_FAIL, msg=f"{stage} failed for {case_id}")
292349
os.chmod(logfile, 0o4755)
293-
result = "Fail"
350+
return "Fail"
351+
352+
if report["setup"].failed:
353+
result = fail_test("Setup")
294354
elif ("call" not in report) or report["call"].failed:
295-
logging.log(level=TEST_FAIL, msg=f"Test failed for {case_id}")
296-
os.chmod(logfile, 0o4755)
297-
result = "Fail"
355+
result = fail_test("Test")
298356
elif report["call"].passed:
299-
logging.log(level=TEST_PASS, msg=f"Test passed for {case_id}")
300-
os.chmod(logfile, 0o755)
301-
result = "Pass"
357+
compliance = get_compliance_result(case_id)
358+
if compliance is not None and compliance == "Fail":
359+
result = fail_test("Compliance")
360+
else:
361+
logger.log(level=TEST_PASS, msg=f"Test passed for {case_id}")
362+
os.chmod(logfile, 0o755)
363+
result = "Pass"
302364
else:
303-
logging.log(level=TEST_INFO, msg=f"Test skipped for {case_id}")
365+
logger.log(level=TEST_INFO, msg=f"Test skipped for {case_id}")
304366
result = "Skip"
305367

306368
logger.removeHandler(fh)

0 commit comments

Comments
 (0)