diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1f36827 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.venv +chromedriver +results/* \ No newline at end of file diff --git a/README.md b/README.md index 582f696..781a764 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,14 @@ # MonitorTrackingCoverage -**MonitorTrackingCoverage** is a Python script that allows you to compare to different states of tracking setups for a given set of web sites. +**MonitorTrackingCoverage** is a Python script that allows you to compare to different states of tracking setups for web analytics for a given set of urls. -This scripts reads all tracked variables from a given URL and puts the result in JSON format to a file. +## Use Case ## -If you run the script at a later time, it will read all tracked variables and compare them to the original variables you read from the URL. The script will document all changes and put the result in JSON format to a file. It will also create an overview containing all scanned URL and put it into an Excel-file. +Imagine you have implemented **Adobe Data Collection** (vulgo: Adobe Launch) and you changed a data element or a rule. Before you want to publish the changes to production, you want to check, if the changes somehow harm the current setup, e.g. if the changes will change how dimensions will be tracked. This script records tracked dimensions from two different development environments to help you to find any issues like that. + +It will read all tracked variables/dimensions from a given URL and a defined environment and puts the result to a JSON file. + +After that it will read all variables/dimensions for another environment but the same URL and compare the tracked variables/dimensions to the previous result. You will receive a JSOn file and an Excel sheet with all changes and an output to the screen. All files are placed in the subfolder **./results**. @@ -18,6 +22,15 @@ This is an example of the Excel result: ## Prerequisites ## +### Python Dependencies +``` +python3 -m venv .venv +source .venv/bin/activate +pip3 install -r requirements.txt +``` + +### Chrome + This script uses the **Selenium** browser automation. It requires you to download the headless browser called "**Chrome driver**" from here: https://chromedriver.chromium.org/downloads @@ -132,4 +145,4 @@ See How [to contribute](https://github.com/dbsystel/tracking-tester/blob/main/CO This project is licensed under [Apache-2.0](https://github.com/dbsystel/tracking-tester/blob/main/LICENSE) -Copyright 2023 DB Systel GmbH +Copyright 2023 DB Systel GmbH \ No newline at end of file diff --git a/comparator.py b/comparator.py deleted file mode 100755 index d50ee57..0000000 --- a/comparator.py +++ /dev/null @@ -1,280 +0,0 @@ -# Copyright 2023 DB Systel GmbH -# License Apache-2.0 - -from email import message -import json -from os import error - -from importlib_metadata import abc - -# This class parses JSON objects and compares variables in the predefined format. -# -# { -# "ID": { -# "variables": { -# "var_name": { -# "value": [ 1, 2, 3 ], -# "type": "int" | "float" | "str" | "*", -# "length": -1 | , -# "required": true | false, -# "error": 0 | 1, -# "message": -# }, -# } -# } -# } -# - -class Comparator(): - - keyword = "variables" - obj_original: dict = {} - - defined: bool = False - - succeed: int = 0 - failed: int = 0 - - # The initialization of the class requires a dictionary. - def __init__(self, obj_original: dict) -> None: - - self.define_original(obj_original) - - # The method passes the dictionary and checks if it - # contains the required variables and the correct format. - def define_original(self, obj_original: dict) -> None: - - if self.check_json_format(obj_original) == True: - - self.obj_original = obj_original - self.defined = True - - else: - - self.obj_original = None - self.defined = False - raise error("The JSON object could not be read in because the format is not passed as expected.") - - # Returns a value of type boolean if the comparator object - # is initialized - def is_defined(self) -> bool: - return self.defined - - # Returns the number of tested pages from the last test - # run (call of the function check_json()). - def get_tested(self) -> int: - return (self.succeed + self.failed) - - # Returns the number of successfully tested pages from - # the last test run (call to check_json() function). - def get_succeed(self) -> int: - return self.succeed - - # Returns the number of failed tested pages from the last - # test run (call to check_json() function). - def get_failed(self) -> int: - return self.failed - - # Static method which returns the type of an passed object. - @staticmethod - def check_type(obj) -> type: - return type(obj) - - # Static method which passes a value of type boolean if - # the object was defined correct. - @staticmethod - def check_defined(obj) -> bool: - if type(obj) is str and len(obj) > 0: - return True - - if (type(obj) is int or type(obj) is float) and obj != -1: - return True - - return False - - # The method checks that the given format of the JSON file - # is correct and returns a boolean. - @staticmethod - def check_json_format(json: any, orginial: bool = False): - - pages = json - - if type(pages) is not dict: - raise error("[FormatCheck] JSON is not defined as dictionary. " + str(type(pages))) - - if len(pages) <= 0: - raise error("[FormatCheck] No elements available in the JSON. " + str(len(pages))) - - for page in pages: - if type(pages[page]) is not dict: - raise error("[FormatCheck] Page '" + str(page) + "' is not defined as dictionary. " + str(type(pages[page]))) - - - if "variables" not in pages[page]: - raise error("[FormatCheck] There are no variable definitions for the page '" + str(page) + "'.") - - - variables = pages[page][Comparator.keyword] - for variable in variables: - - if type(pages[page][Comparator.keyword][variable]) is not dict: - raise error("[FormatCheck] Variable '" + str(variable) + "' in page '" + str(page) + "' is not defined as dictionary. " + str(type(pages[page][variable]))) - - - if "value" not in pages[page][Comparator.keyword][variable]: - raise error("[FormatCheck] Value in variable '" + str(variable) + "' in page '" + str(page) + "' is not defined.") - - _value = pages[page][Comparator.keyword][variable]["value"] - if type(_value) is not list: - raise error("[FormatCheck] The type of value in variable '" + str(variable) + "' in page '" + str(page) + "' is not a list.") - - if "type" not in pages[page][Comparator.keyword][variable]: - raise error("[FormatCheck] Type in variable '" + str(variable) + "' in page '" + str(page) + "' is not defined.") - - - _type = pages[page][Comparator.keyword][variable]["type"] - if _type != "int" and _type != "float" and _type != "str" and _type != "*": - raise error("[FormatCheck] Value for type in variable '" + str(variable) + "' in page '" + str(page) + "' is not invalid. " + str(_type)) - - - if "length" not in pages[page]["variables"][variable]: - raise error("[FormatCheck] Length in variable '" + str(variable) + "' in page '" + str(page) + "' is not defined.") - - - if "required" not in pages[page]["variables"][variable]: - raise error("[FormatCheck] Required in variable '" + str(variable) + "' in page '" + str(page) + "' is not defined.") - - - _required = pages[page]["variables"][variable]["required"] - if _required is not True and _required is not False: - raise error("[FormatCheck] Value for required in variable '" + str(variable) + "' in page '" + str(page) + "' is not invalid. " + str(_required)) - - - return True - - # Checks the passed JSON object for the correct format and - # then if the passed values match what is expected from - # the original JSON. - def check_json(self, obj_test, obj_mapping) -> dict: - - if self.check_json_format(obj_test) != True: - raise error("The test was not processed because the format of the test JSON is not passed as expected.") - - obj_result = obj_test.copy() - - self.succeed = 0 - self.failed = 0 - - # loop through the pages - for original_page in self.obj_original: - - if original_page not in obj_test: - raise error("Execution stopped! Page '" + str(original_page) + "' was not found in the JSON object.") - # or use continue for ignore the missing pages - - original_variables = self.obj_original[original_page][self.keyword] - # loop through the adobe analytics variables - for original_variable in original_variables: - - original_variable_def = original_variables[original_variable] - - # check if the variable exists in the JSON - if original_variable not in obj_test[original_page][self.keyword]: - - self.failed += 1 - - obj_result[original_page][self.keyword][original_variable] = { - "value": [""], - "message": "Test failed. Variable was not found in the list of variables.", - "error": 1 - } - - if original_variable in obj_mapping: - obj_result[original_page][self.keyword][original_variable]['variable_mapping'] = obj_mapping[original_variable] - else: - obj_result[original_page][self.keyword][original_variable]['variable_mapping'] = '-' - - continue - - tested_variable_result = obj_result[original_page][self.keyword][original_variable] - - if original_variable in obj_mapping: - tested_variable_result['variable_mapping'] = obj_mapping[original_variable] - else: - tested_variable_result['variable_mapping'] = '-' - - tested_variable = obj_test[original_page][self.keyword][original_variable] - - # if dict, then check the entries in the dictionary - if type(tested_variable) is not dict: - tested_variable_result["message"] = "Test failed. A dictionary was expected as a value for the key '" + str(original_variable) + "'." - tested_variable_result["error"] = 1 - continue - - # check if value is required - if original_variable_def["required"] == False: - self.succeed += 1 - tested_variable_result["message"] = "Test was successful." - tested_variable_result["error"] = 0 - continue - - # check if the variable type is defined and matches - if original_variable_def["type"] != "*": - if original_variable_def["type"] != tested_variable["type"]: - self.failed += 1 - tested_variable_result["message"] = "Test failed. The type of the variable does not match the expected type." - tested_variable_result["error"] = 1 - continue - - # check if the variable length is defined and matches - if original_variable_def["length"] != -1: - if original_variable_def["length"] != tested_variable["length"]: - - self.failed += 1 - tested_variable_result["message"] = "Test failed. The length of the variable does not match the expected length." - tested_variable_result["error"] = 1 - continue - - # check if tested value is part of allowed values - # if original list of allowed values is 0, every value is ok - if type(original_variable_def["value"]) is list and len(original_variable_def["value"]) > 0: - # not necessary, it's always a list and it always contains 1 item only - # if type(tested_variable["value"]) is list: - # test_value = tested_variable["value"][0] - # else: - # test_value = tested_variable["value"] - - if tested_variable["value"][0] not in original_variable_def["value"]: - - self.failed += 1 - # TODO: add expected and actual value here - tested_variable_result["message"] = "Test failed. The value of the variable is not included in the list of expected values." - tested_variable_result["error"] = 1 - continue - - self.succeed += 1 - tested_variable_result["message"] = "Test was successful." - tested_variable_result["error"] = 0 - - return obj_result - - - - - -if __name__ == '__main__': - - with open("test.json") as jfile: - original = json.load(jfile) - - with open("test_copy.json") as jfile: - test = json.load(jfile) - - comparator = Comparator(original) - result = comparator.check_json(test) - - print("##################### Comparator #####################") - print("\tTested: " + str(comparator.get_tested()) + " | Succeed: " + str(comparator.get_succeed()) + " | Failed: " + str(comparator.get_failed()) + "\n") - print("######################################################") - - print(str(result) + "\n") diff --git a/compare.py b/compare.py new file mode 100755 index 0000000..6cc5aff --- /dev/null +++ b/compare.py @@ -0,0 +1,117 @@ +# Copyright 2023 DB Systel GmbH +# License Apache-2.0 + +# This class parses JSON objects and compares variables in the predefined format. +# +# { +# "ID": { +# "variables": { +# "var_name": { +# "value": [ 1, 2, 3 ], +# "type": "int" | "float" | "str" | "*", +# "length": -1 | , +# "required": true | false, +# "error": 0 | 1, +# "message": +# }, +# } +# } +# } +# + +class Compare(): + + def compare(self, + pages_before: dict = None, + pages_after: dict = None, + var_mapping: dict = None) -> dict: + + """ + Compare two sets of pages before and after a change, and generate a report. + + This method takes two dictionaries representing pages before and after a change, + and an optional variable mapping dictionary for variable name translations. + + Parameters: + - pages_before (dict): A dictionary representing pages before the change. + - pages_after (dict): A dictionary representing pages after the change. + - var_mapping (dict, optional): A dictionary mapping variable names for translation. + + Returns: + - result (dict): A dictionary containing the comparison report with details of the changes. + + Example: + ``` + before = {'page1': {'var1': 10, 'var2': 20}, 'page2': {'var1': 5, 'var3': 15}} + after = {'page1': {'var1': 12, 'var2': 20}, 'page3': {'var4': 8}} + + comparer = PageComparer() + comparison_result = comparer.compare(pages_before=before, pages_after=after) + ``` + """ + + # loop through all pages from state "before" + for page_before in pages_before: + + # check if state "after" contains current page + if page_before not in pages_after: + pages_before[page_before] = { + "message": f"Page `{page_before}`not found.", + "error": 1 + } + + # loop through all variables from state "before" + for var_before in pages_before[page_before]["variables"]: + + # if exists, get variable mapping ("readable name") + if var_before in var_mapping: + pages_before[page_before]["variables"][var_before]['variable_mapping'] = var_mapping[var_before] + else: + pages_before[page_before]["variables"][var_before]['variable_mapping'] = "-" + + # check if the variable exists in state "after" + if var_before not in pages_after[page_before]["variables"]: + + pages_before[page_before]["variables"][var_before]['message'] = f"Variable `{var_before}` was not found." + pages_before[page_before]["variables"][var_before]['error'] = 1 + + continue + + tested_variable = pages_before[page_before]["variables"][var_before] + + # if dict, then check the entries in the dictionary + if type(tested_variable) is not dict: + pages_before[page_before]["variables"][var_before]["message"] = "Test failed. A dictionary was expected as a value for the key '" + str(original_variable) + "'." + pages_before[page_before]["variables"][var_before]["error"] = 1 + continue + + # check if variable is mandatory/required + if pages_before[page_before]["variables"][var_before]["required"] == False: + pages_before[page_before]["variables"][var_before]["message"] = "Not required." + pages_before[page_before]["variables"][var_before]["error"] = 0 + continue + + # check if the variable type is defined and matches + if pages_before[page_before]["variables"][var_before]["type"] != "*": + if pages_before[page_before]["variables"][var_before]["type"] != pages_after[page_before]["variables"][var_before]["type"]: + pages_before[page_before]["variables"][var_before]["message"] = f'The type of the variable does not match the expected type: {pages_before[page_before]["variables"][var_before]["type"]}' + pages_before[page_before]["variables"][var_before]["error"] = 1 + continue + + # check if the variable length is defined and matches + if pages_before[page_before]["variables"][var_before]["length"] > 0: + if pages_before[page_before]["variables"][var_before]["length"] != pages_before[page_before]["variables"][var_before]["length"]: + + pages_before[page_before]["variables"][var_before]["message"] = f'The length of the variable does not match the expected length: {pages_before[page_before]["variables"][var_before]["length"]}' + pages_before[page_before]["variables"][var_before]["error"] = 1 + continue + + # check if tested value is an allowed value + if pages_after[page_before]["variables"][var_before]["value"][0] not in pages_before[page_before]["variables"][var_before]["value"]: + pages_before[page_before]["variables"][var_before]["message"] = f'The value of the variable is not included in the list of expected values: {", ".join(pages_before[page_before]["variables"][var_before]["value"])}' + pages_before[page_before]["variables"][var_before]["error"] = 1 + continue + + pages_before[page_before]["variables"][var_before]["error"] = 0 + + return pages_before \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..93c684c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +selenium==4.1.0 +selenium-wire==5.1.0 +lxml==4.6.3 +pandas==1.3.3 +openpyxl==3.1.2 \ No newline at end of file diff --git a/run.py b/run.py index de7ea5d..da7c87c 100755 --- a/run.py +++ b/run.py @@ -1,31 +1,32 @@ +#! # Copyright 2023 DB Systel GmbH # License Apache-2.0 import time -import sys, urllib3, os -# from selenium import webdriver +from datetime import datetime # date time handling +import sys, os + +# selenium from selenium.webdriver.common.desired_capabilities import DesiredCapabilities from seleniumwire import webdriver # wrapper to get network requests from browser and also modify LaunchRequests in real time (https://stackoverflow.com/questions/31354352/selenium-how-to-inject-execute-a-javascript-in-to-a-page-before-loading-executi) -import pickle # to save / load cookies from pathlib import Path # check if cookie dump exists from urllib.parse import urlparse, parse_qs, urldefrag # extract get parameters from url -import json # export result - -# modify requests before rendering of page https://stackoverflow.com/questions/31354352/selenium-how-to-inject-execute-a-javascript-in-to-a-page-before-loading-executi -from lxml import html -from lxml.etree import ParserError -from lxml.html import builder -import argparse +import argparse # to get runtime arguments -import pandas as pd +import pandas as pd # for export to Excel +import json # for export to JSON -from comparator import Comparator +from compare import Compare # small class to compare two dicts -# to read available width in terminal output -from os import get_terminal_size +from os import get_terminal_size # get available width in terminal output -from datetime import datetime +# TODO: clean up unsued libs +# modify requests before rendering of page https://stackoverflow.com/questions/31354352/selenium-how-to-inject-execute-a-javascript-in-to-a-page-before-loading-executi +# from lxml import html +# from lxml.etree import ParserError +# from lxml.html import builder +#import pickle # to save / load cookies def get_real_type(string: str) -> str: @@ -90,6 +91,9 @@ def __init__(self, # this would overwrite existing result file # which could be unexpected and hence unwanted + if not os.path.exists('results'): + os.makedirs('results') + if mode == 'init' and focus is None: self.init_driver(silent, mode) @@ -132,10 +136,12 @@ def __init__(self, if focus is not None: self.original = {focus: self.original[focus]} - comparator = Comparator(self.original) + compare = Compare() # TODO: add stop_on_error flag to stop on every error, makes it easier to fix errors - self.result = comparator.check_json(self.result, self.var_mapping) + self.result = compare.compare(pages_before=self.result, + pages_after=self.original, + var_mapping=self.var_mapping) # don't create the excel output when focus page is defined if focus is None: @@ -176,7 +182,6 @@ def switch_tag_container(self, request): return request - def analyse_result(self): self.df_results_analysed = pd.DataFrame() @@ -274,6 +279,11 @@ def setup(self, settings, env, focus): self.var_mapping = settings['mapping'] + if "cookies" in settings: + self.cookies = settings['cookies'] + else: + self.cookies = None + # keep those for later use: automatically parse a whole website? # self.urls_parsed = [] # a plain list of all urls to parse # self.urls_to_parse_next = [] # a plain list of all urls to be parsed @@ -349,12 +359,40 @@ def parse_page(self, url) -> dict: # for cookie in cookies: # driver.add_cookie(cookie) + + # https://stackoverflow.com/a/63220249 + # this enables network tracking, this allows us to set cookies before the actual request + # otherwise we could not place cookies in the websites domain + self.driver.execute_cdp_cmd('Network.enable', {}) + + # set cookies before actual request is made + if self.cookies is not None: + for cookie in self.cookies: + self.driver.execute_cdp_cmd('Network.setCookie', { + 'name' : cookie['name'], + 'value' : cookie['value'], + 'domain' : cookie['domain'] + }) + + # this enables network tracking + self.driver.execute_cdp_cmd('Network.disable', {}) + self.driver.get(url) try: - self.driver.wait_for_request(self.adobe_analytics_host, 5) + self.driver.wait_for_request(self.adobe_analytics_host, 1) except: - print(f'Could not find tracking container on {url}, do you provided the correct container locations?') + + # Access requests via the `requests` attribute + for request in self.driver.requests: + if request.response: + print( + request.url, + request.response.status_code, + request.response.headers['Content-Type'] + ) + + print(f'Could not find tracking container `{self.adobe_analytics_host}`on `{url}`, do you provided the correct container locations?') sys.exit() # grace period to give the onsite script time to work @@ -422,6 +460,7 @@ def parse_page(self, url) -> dict: args_parser.add_argument('--mode', dest='mode', required=False, type=str, default='test', choices=['test', 'init', 'analyse'], help='init: initially read the original state, test: compare original state and current state, analyse: analyse test status and create a report') + # TODO: improve/ease file handling, we don't need original/test folders, we can just assume them from env argument args_parser.add_argument('--original', dest='original', required=True, type=str, help='filename that contains original tracked variables in JSON format') diff --git a/settings.json b/settings.json index 968408a..bfd81a5 100644 --- a/settings.json +++ b/settings.json @@ -15,6 +15,5 @@ "v6": "eVar 6 - Some Dimension", "c11": "prop 11 - Another Dimension" } - } } \ No newline at end of file