diff --git a/tests/test_cupid_algo.py b/tests/test_cupid_algo.py new file mode 100644 index 0000000..84e43dd --- /dev/null +++ b/tests/test_cupid_algo.py @@ -0,0 +1,261 @@ +import unittest +from types import SimpleNamespace +from typing import List +import math +import pandas as pd +from anytree import LevelOrderIter + +from valentine.algorithms.cupid import ( + cupid_model, + linguistic_matching as cupid_ling, + structural_similarity as cupid_struct, + tree_match as cupid_tree, + schema_element as cupid_elem, + schema_element_node as cupid_node, # noqa: F401 + schema_tree as cupid_tree_mod, +) + +from valentine.data_sources.base_column import BaseColumn +from valentine.data_sources.base_table import BaseTable + + +class DummyColumn(BaseColumn): + def __init__(self, uid, name, dtype, data): + self._uid, self._name, self._dtype, self._data = uid, name, dtype, data + + @property + def unique_identifier(self): return self._uid + @property + def name(self): return self._name + @property + def data_type(self): return self._dtype + @property + def data(self): return self._data + + +class DummyTable(BaseTable): + def __init__(self, uid, name, cols: List[BaseColumn]): + self._uid, self._name, self._cols = uid, name, cols + + @property + def unique_identifier(self): return self._uid + @property + def name(self): return self._name + def get_columns(self) -> List[BaseColumn]: return self._cols + def get_df(self) -> pd.DataFrame: return pd.DataFrame({c.name: c.data for c in self._cols}) + @property + def is_empty(self) -> bool: return False + + +# ---- Patch nltk + wordnet so tests run offline ---- +def _mock_word_tokenize(s: str): + s = s.replace(",", " , ").replace("_", " ") + return s.split() + +def _install_nltk_mocks(): + mock_stopwords = SimpleNamespace(words=lambda lang: ["the", "and"]) + mock_wn = SimpleNamespace( + all_lemma_names=lambda: {"alpha", "beta"}, + synsets=lambda w: [f"{w}_s1", f"{w}_s2"] if w in {"alpha", "beta"} else [], + wup_similarity=lambda s1, s2: 0.5, + ) + cupid_ling.nltk = SimpleNamespace(word_tokenize=_mock_word_tokenize) + cupid_ling.stopwords = mock_stopwords + cupid_ling.wn = mock_wn + + +class TestCupidLinguisticStructural(unittest.TestCase): + def setUp(self): + _install_nltk_mocks() + + def test_snakecase_and_normalization(self): + sc = cupid_ling.snakecase_convert("CamelCaseX") + self.assertEqual(sc, "camel_case_x") + + se = cupid_ling.normalization("HelloWorld, 123 and") + datas = [t.data for t in se.tokens] + types = [t.token_type for t in se.tokens] + self.assertIn("hello", datas) + self.assertIn("world", datas) + self.assertIn(",", datas) + self.assertIn("123", datas) + self.assertIn(cupid_elem.TokenTypes.SYMBOLS, types) + self.assertIn(cupid_elem.TokenTypes.NUMBER, types) + self.assertIn(cupid_elem.TokenTypes.COMMON_WORDS, types) + self.assertIn(cupid_elem.TokenTypes.CONTENT, types) + + def test_token_type_and_similarity(self): + t_num = cupid_elem.Token().add_data("3.14") + t_txt = cupid_elem.Token().add_data("alpha") + self.assertEqual(cupid_ling.add_token_type(t_num), cupid_elem.TokenTypes.NUMBER) + self.assertEqual(cupid_ling.add_token_type(t_txt), cupid_elem.TokenTypes.CONTENT) + + a1 = cupid_elem.Token(); a1.data = "alpha"; a1.token_type = cupid_elem.TokenTypes.CONTENT + b1 = cupid_elem.Token(); b1.data = "beta"; b1.token_type = cupid_elem.TokenTypes.CONTENT + + sim_ab = cupid_ling.name_similarity_tokens([a1], [b1]) + self.assertGreaterEqual(sim_ab, 0.0) + + sim_same = cupid_ling.get_partial_similarity([a1], [a1]) + self.assertEqual(sim_same, 1.0) + + def test_wordnet_and_leven(self): + self.assertEqual(cupid_ling.compute_similarity_wordnet("alpha", "beta"), 0.5) + self.assertTrue(math.isnan(cupid_ling.compute_similarity_wordnet("zzz", "beta"))) + lv = cupid_ling.compute_similarity_leven("alpha", "alp") + self.assertGreaterEqual(lv, 0.0) + self.assertLessEqual(lv, 1.0) + + def test_data_type_and_compatibility(self): + def mk(content: str): + t = cupid_elem.Token(); t.data = content; t.token_type = cupid_elem.TokenTypes.CONTENT; return t + sim = cupid_ling.data_type_similarity([mk("alpha")], [mk("beta")]) + self.assertGreaterEqual(sim, 0.0) + + comp = cupid_ling.compute_compatibility({"alpha", "beta"}) + self.assertIn("alpha", comp) + self.assertIn("beta", comp["alpha"]) + + def test_name_similarity_elements_and_compute_lsim(self): + e1 = cupid_elem.SchemaElement("A") + e2 = cupid_elem.SchemaElement("B") + for w in ["hello", "world"]: + t = cupid_elem.Token(); t.data = w; t.token_type = cupid_elem.TokenTypes.CONTENT; e1.add_token(t) + for w in ["hello", "beta"]: + t = cupid_elem.Token(); t.data = w; t.token_type = cupid_elem.TokenTypes.CONTENT; e2.add_token(t) + e1.add_category("alpha"); e2.add_category("beta") + + nse = cupid_ling.name_similarity_elements(e1, e2) + self.assertGreaterEqual(nse, 0.0) + lsim = cupid_ling.compute_lsim(e1, e2) + self.assertGreaterEqual(lsim, 0.0) + mx = cupid_ling.get_max_ns_category(["alpha"], ["beta"]) + self.assertGreaterEqual(mx, 0.0) + + def test_schema_tree_and_structural_similarity(self): + st = cupid_tree_mod.SchemaTree("DB__X") + root = st.get_node("DB__X") + st.add_node(table_name="T", table_guid="tg", data_type="Table", parent=root) + tbl = st.get_node("T") + st.add_node(table_name="T", table_guid="tg", column_name="C1", column_guid="c1", data_type="int", parent=tbl) + st.add_node(table_name="T", table_guid="tg", column_name="C2", column_guid="c2", data_type="int", parent=tbl) + + st2 = cupid_tree_mod.SchemaTree("DB__Y") + root2 = st2.get_node("DB__Y") + st2.add_node(table_name="U", table_guid="ug", data_type="Table", parent=root2) + tbl2 = st2.get_node("U") + st2.add_node(table_name="U", table_guid="ug", column_name="D1", column_guid="d1", data_type="int", parent=tbl2) + st2.add_node(table_name="U", table_guid="ug", column_name="D2", column_guid="d2", data_type="int", parent=tbl2) + + leaves_s = [n.long_name for n in st.get_leaves()] + leaves_t = [n.long_name for n in st2.get_leaves()] + + # Provide sims for ALL leaf pairs to avoid KeyError inside compute_ssim + sims = { + (s, t): {'wsim': 0.0, 'ssim': 0.0, 'lsim': 0.0} + for s in leaves_s + for t in leaves_t + } + sims[(leaves_s[0], leaves_t[0])]['wsim'] = 1.0 + sims[(leaves_s[0], leaves_t[0])]['ssim'] = 1.0 + + ssim = cupid_struct.compute_ssim(tbl, tbl2, sims, th_accept=0.5) + self.assertFalse(math.isnan(ssim)) + self.assertGreaterEqual(ssim, 0.0) + self.assertLessEqual(ssim, 1.0) + + cupid_struct.change_structural_similarity(leaves_s, leaves_t, sims, factor=2.0) + self.assertEqual(sims[(leaves_s[0], leaves_t[0])]['ssim'], 1.0) + + def test_tree_match_helpers_and_mapping(self): + st = cupid_tree_mod.SchemaTree("DB__A"); root = st.get_node("DB__A") + st.add_node(table_name="T", table_guid="tg", data_type="Table", parent=root) + tbl = st.get_node("T") + st.add_node(table_name="T", table_guid="tg", column_name="C", column_guid="c", data_type="int", parent=tbl) + + st2 = cupid_tree_mod.SchemaTree("DB__B"); root2 = st2.get_node("DB__B") + st2.add_node(table_name="U", table_guid="ug", data_type="Table", parent=root2) + tbl2 = st2.get_node("U") + st2.add_node(table_name="U", table_guid="ug", column_name="D", column_guid="d", data_type="int", parent=tbl2) + + comp = {"int": {"int": 1.0}} + l_sims = { (st.get_leaves()[0].long_name, st2.get_leaves()[0].long_name): 0.5 } + sims = cupid_tree.get_sims(st.get_leaves(), st2.get_leaves(), comp, l_sims, leaf_w_struct=0.2) + self.assertIn((st.get_leaves()[0].long_name, st2.get_leaves()[0].long_name), sims) + + new = cupid_tree.recompute_wsim(st, st2, sims, w_struct=0.6, th_accept=0.14) + self.assertTrue(new) + + mapped = cupid_tree.mapping_generation_leaves(st, st2, new, th_accept=0.1) + self.assertIsInstance(mapped, dict) + + # create_output_dict expects a pair of long-name (4-tuples), not the already-mapped keys + ln_pair = (st.get_leaves()[0].long_name, st2.get_leaves()[0].long_name) + out = cupid_tree.create_output_dict(ln_pair, 0.6) + self.assertIsInstance(out, dict) + + # Ensure sims has ALL non-leaf pairs to avoid KeyError in mapping_generation_non_leaves + max_level_s = st.height - 1 + max_level_t = st2.height - 1 + non_leaves_s = [n.long_name for n in LevelOrderIter(st.root, maxlevel=max_level_s)] + non_leaves_t = [n.long_name for n in LevelOrderIter(st2.root, maxlevel=max_level_t)] + + for s_ln in non_leaves_s: + for t_ln in non_leaves_t: + new.setdefault((s_ln, t_ln), {'wsim': 0.0, 'ssim': 0.0, 'lsim': 0.0}) + + # Explicitly ensure the table-table pair exists, then bump wsim + entry = new.setdefault((tbl.long_name, tbl2.long_name), {'wsim': 0.0, 'ssim': 0.0, 'lsim': 0.0}) + entry['wsim'] = 1.0 + + # The function should run and return a list (may be empty depending on structure/thresholds) + non_leaves = cupid_tree.mapping_generation_non_leaves(st, st2, new, th_accept=0.0) + self.assertIsInstance(non_leaves, list) + + def test_cupid_model_top_level(self): + t_src = DummyTable("SUID", "S", [DummyColumn(1, "A", "int", [1])]) + t_tgt = DummyTable("TUID", "T", [DummyColumn(2, "B", "int", [2])]) + + def fake_tree_match(st, tt, cats, *args, **kwargs): + s_leaf = st.get_leaves()[0].long_name + t_leaf = tt.get_leaves()[0].long_name + return {(s_leaf, t_leaf): {'wsim': 1.0, 'ssim': 1.0, 'lsim': 0.0}} + + def fake_recompute_wsim(st, tt, sims, *args, **kwargs): + return sims + + def fake_mapping(st, tt, sims, th): + key = next(iter(sims.keys())) + return {((key[1][0], key[1][2]), (key[0][0], key[0][2])): 1.0} + + # Patch both cupid_model (where Cupid resolves names) and cupid_tree (for consistency) + orig_tm_m, orig_rc_m, orig_map_m = ( + cupid_model.tree_match, + cupid_model.recompute_wsim, + cupid_model.mapping_generation_leaves, + ) + orig_tm, orig_rc, orig_map = ( + cupid_tree.tree_match, + cupid_tree.recompute_wsim, + cupid_tree.mapping_generation_leaves, + ) + try: + cupid_model.tree_match = fake_tree_match + cupid_model.recompute_wsim = fake_recompute_wsim + cupid_model.mapping_generation_leaves = fake_mapping + + cupid_tree.tree_match = fake_tree_match + cupid_tree.recompute_wsim = fake_recompute_wsim + cupid_tree.mapping_generation_leaves = fake_mapping + + matcher = cupid_model.Cupid() + res = matcher.get_matches(t_src, t_tgt) + self.assertIsInstance(res, dict) + self.assertTrue(res) + finally: + cupid_model.tree_match, cupid_model.recompute_wsim, cupid_model.mapping_generation_leaves = ( + orig_tm_m, orig_rc_m, orig_map_m + ) + cupid_tree.tree_match, cupid_tree.recompute_wsim, cupid_tree.mapping_generation_leaves = ( + orig_tm, orig_rc, orig_map + ) diff --git a/tests/test_data_sources.py b/tests/test_data_sources.py new file mode 100644 index 0000000..a560bae --- /dev/null +++ b/tests/test_data_sources.py @@ -0,0 +1,136 @@ +import os +import tempfile +import unittest +from typing import List + +import pandas as pd + +from valentine.data_sources.base_column import BaseColumn +from valentine.data_sources.base_table import BaseTable +from valentine.data_sources.utils import get_encoding, get_delimiter, is_date + + +# ---- Minimal concrete implementations for the ABCs ---- + +class DummyColumn(BaseColumn): + def __init__(self, uid: object, name: str, dtype: str, data: List[object]): + self._uid = uid + self._name = name + self._dtype = dtype + self._data = data + + @property + def unique_identifier(self) -> object: + return self._uid + + @property + def name(self) -> str: + return self._name + + @property + def data_type(self) -> str: + return self._dtype + + @property + def data(self) -> list: + return self._data + + +class DummyTable(BaseTable): + def __init__(self, uid: object, name: str, columns: List[BaseColumn], df: pd.DataFrame): + self._uid = uid + self._name = name + self._columns = columns + self._df = df + + @property + def unique_identifier(self) -> object: + return self._uid + + @property + def name(self) -> str: + return self._name + + def get_columns(self) -> List[BaseColumn]: + return self._columns + + def get_df(self) -> pd.DataFrame: + return self._df + + @property + def is_empty(self) -> bool: + return self._df.empty + + +# ---- Tests ---- + +class TestBaseColumnTableAndUtils(unittest.TestCase): + def setUp(self): + self.col1 = DummyColumn(uid=1, name="a", dtype="int64", data=[1, 2, 3]) + self.col2 = DummyColumn(uid=2, name="b", dtype="object", data=["2020-01-01", "x"]) + self.df = pd.DataFrame({"a": [1, 2, 3], "b": ["2020-01-01", "x", "y"]}) + self.table = DummyTable(uid="T1", name="tbl", columns=[self.col1, self.col2], df=self.df) + + def test_basecolumn_str_size_empty(self): + s = str(self.col1) + self.assertIn("Column:", s) + self.assertIn("", s) + self.assertIn("| 1", s) + self.assertEqual(self.col1.size, 3) + self.assertFalse(self.col1.is_empty) + + empty_col = DummyColumn(uid=3, name="c", dtype="float64", data=[]) + self.assertEqual(empty_col.size, 0) + self.assertTrue(empty_col.is_empty) + self.assertIn("", str(empty_col)) + + def test_basetable_str_and_lookup(self): + s = str(self.table) + self.assertIn("Table: tbl", s) + self.assertIn("Column: a", s) + self.assertIn("Column: b", s) + lookup = self.table.get_guid_column_lookup() + self.assertEqual(lookup, {"a": 1, "b": 2}) + self.assertFalse(self.table.is_empty) + + def test_basetable_get_data_type(self): + self.assertEqual(BaseTable.get_data_type([1], "int64"), "int") + self.assertEqual(BaseTable.get_data_type([1.2], "float64"), "float") + self.assertEqual(BaseTable.get_data_type(["2020-01-01"], "object"), "date") + self.assertEqual(BaseTable.get_data_type(["hello"], "object"), "varchar") + self.assertEqual(BaseTable.get_data_type([], "object"), "varchar") + self.assertEqual(BaseTable.get_data_type([], "float64"), "float64") + + def test_is_date(self): + self.assertTrue(is_date("2020-12-31")) + self.assertTrue(is_date(20200101)) # will be str()'d + self.assertFalse(is_date("not-a-date")) + self.assertTrue(is_date("Mon, 5 Jan 2015", fuzzy=True)) + + def test_get_delimiter_and_encoding(self): + with tempfile.TemporaryDirectory() as d: + # delimiter: comma + p_comma = os.path.join(d, "comma.csv") + with open(p_comma, "w", encoding="utf-8") as f: + f.write("a,b,c\n1,2,3\n") + self.assertEqual(get_delimiter(p_comma), ",") + + # delimiter: semicolon + p_sc = os.path.join(d, "semi.csv") + with open(p_sc, "w", encoding="utf-8") as f: + f.write("a;b;c\n1;2;3\n") + self.assertEqual(get_delimiter(p_sc), ";") + + # encoding: ASCII -> returns utf-8 + p_ascii = os.path.join(d, "ascii.txt") + with open(p_ascii, "wb") as f: + f.write(b"just ascii lines\nsecond line\n") + self.assertEqual(get_encoding(p_ascii), "utf-8") + + # encoding: non-ascii (latin-1 with 'é') + p_latin1 = os.path.join(d, "latin1.txt") + with open(p_latin1, "wb") as f: + f.write("caf\u00e9\n".encode("latin-1")) + enc = get_encoding(p_latin1) + self.assertIsInstance(enc, str) + self.assertNotEqual(enc.lower(), "ascii") diff --git a/tests/test_metrics.py b/tests/test_metrics.py index 3099b2a..216cabc 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -1,76 +1,135 @@ import unittest -from valentine.metrics import * + from valentine.algorithms.matcher_results import MatcherResults +from valentine.metrics import ( + Precision, + Recall, + F1Score, + PrecisionTopNPercent, + RecallAtSizeofGroundTruth, +) from valentine.metrics.metric_helpers import get_fp, get_tp_fn + class TestMetrics(unittest.TestCase): - def setUp(self): - self.matches = MatcherResults({ - (('table_1', 'Cited by'), ('table_2', 'Cited by')): 0.8374313, - (('table_1', 'Authors'), ('table_2', 'Authors')): 0.83498037, - (('table_1', 'EID'), ('table_2', 'EID')): 0.8214057, - (('table_1', 'Title'), ('table_2', 'DUMMY1')): 0.8214057, - (('table_1', 'Title'), ('table_2', 'DUMMY2')): 0.8114057, - }) - self.ground_truth = [ - ('Cited by', 'Cited by'), - ('Authors', 'Authors'), - ('EID', 'EID'), - ('Title', 'Title'), - ('DUMMY3', 'DUMMY3') + def setUp(self) -> None: + # Scores chosen so that the highest-confidence pairs are the true matches, + # and "Title" has two competing candidates (DUMMY1, DUMMY2) to exercise 1-1 logic. + self.matches = MatcherResults( + { + (("table_1", "Cited by"), ("table_2", "Cited by")): 0.8374313, + (("table_1", "Authors"), ("table_2", "Authors")): 0.83498037, + (("table_1", "EID"), ("table_2", "EID")): 0.8214057, + (("table_1", "Title"), ("table_2", "DUMMY1")): 0.8214057, + (("table_1", "Title"), ("table_2", "DUMMY2")): 0.8114057, + } + ) + + # Five GT pairs: 4 correct, 1 extra (DUMMY3) to influence recall. + self.ground_truth = [ + ("Cited by", "Cited by"), + ("Authors", "Authors"), + ("EID", "EID"), + ("Title", "Title"), + ("DUMMY3", "DUMMY3"), ] - def test_precision(self): - precision = self.matches.get_metrics(self.ground_truth, metrics={Precision()}) - assert 'Precision' in precision and precision['Precision'] == 0.75 - - precision_not_one_to_one = self.matches.get_metrics(self.ground_truth, metrics={Precision(one_to_one=False)}) - assert 'Precision' in precision_not_one_to_one and precision_not_one_to_one['Precision'] == 0.6 - - def test_recall(self): - recall = self.matches.get_metrics(self.ground_truth, metrics={Recall()}) - assert 'Recall' in recall and recall['Recall'] == 0.6 - - recall_not_one_to_one = self.matches.get_metrics(self.ground_truth, metrics={Recall(one_to_one=False)}) - assert 'Recall' in recall_not_one_to_one and recall_not_one_to_one['Recall'] == 0.6 - - def test_f1(self): - f1 = self.matches.get_metrics(self.ground_truth, metrics={F1Score()}) - assert 'F1Score' in f1 and round(100*f1['F1Score']) == 67 - - f1_not_one_to_one = self.matches.get_metrics(self.ground_truth, metrics={F1Score(one_to_one=False)}) - assert 'F1Score' in f1_not_one_to_one and f1_not_one_to_one['F1Score'] == 0.6 - - def test_precision_top_n_percent(self): - precision_0 = self.matches.get_metrics(self.ground_truth, metrics={PrecisionTopNPercent(n=0)}) - assert 'PrecisionTop0Percent' in precision_0 and precision_0['PrecisionTop0Percent'] == 0 + # Handy expected values + self.expected_precision_1to1 = 0.75 # 3/4 due to Title conflict + self.expected_recall_1to1 = 0.6 # 3/5 + # Harmonic mean of P=0.75 and R=0.6 -> 0.666666... + self.expected_f1_1to1 = (2 * self.expected_precision_1to1 * self.expected_recall_1to1) / ( + self.expected_precision_1to1 + self.expected_recall_1to1 + ) + + def test_precision(self) -> None: + with self.subTest(one_to_one=True): + precision = self.matches.get_metrics(self.ground_truth, metrics={Precision()}) + self.assertIn("Precision", precision) + self.assertAlmostEqual(precision["Precision"], self.expected_precision_1to1, places=6) + + with self.subTest(one_to_one=False): + precision_n11 = self.matches.get_metrics( + self.ground_truth, metrics={Precision(one_to_one=False)} + ) + self.assertIn("Precision", precision_n11) + self.assertAlmostEqual(precision_n11["Precision"], 0.6, places=6) + + def test_recall(self) -> None: + with self.subTest(one_to_one=True): + recall = self.matches.get_metrics(self.ground_truth, metrics={Recall()}) + self.assertIn("Recall", recall) + self.assertAlmostEqual(recall["Recall"], self.expected_recall_1to1, places=6) + + with self.subTest(one_to_one=False): + recall_n11 = self.matches.get_metrics(self.ground_truth, metrics={Recall(one_to_one=False)}) + self.assertIn("Recall", recall_n11) + self.assertAlmostEqual(recall_n11["Recall"], 0.6, places=6) + + def test_f1(self) -> None: + with self.subTest(one_to_one=True): + f1 = self.matches.get_metrics(self.ground_truth, metrics={F1Score()}) + self.assertIn("F1Score", f1) + self.assertAlmostEqual(f1["F1Score"], self.expected_f1_1to1, places=6) + + with self.subTest(one_to_one=False): + f1_n11 = self.matches.get_metrics(self.ground_truth, metrics={F1Score(one_to_one=False)}) + self.assertIn("F1Score", f1_n11) + self.assertAlmostEqual(f1_n11["F1Score"], 0.6, places=6) + + def test_precision_top_n_percent(self) -> None: + # n=0 -> empty selection + p0 = self.matches.get_metrics(self.ground_truth, metrics={PrecisionTopNPercent(n=0)}) + self.assertIn("PrecisionTop0Percent", p0) + self.assertEqual(p0["PrecisionTop0Percent"], 0) + + # n=50 -> top half of candidates are all correct here + p50 = self.matches.get_metrics(self.ground_truth, metrics={PrecisionTopNPercent(n=50)}) + self.assertIn("PrecisionTop50Percent", p50) + self.assertEqual(p50["PrecisionTop50Percent"], 1.0) + + # n=100 -> equals overall precision + overall_p = self.matches.get_metrics(self.ground_truth, metrics={Precision()}) + p100 = self.matches.get_metrics(self.ground_truth, metrics={PrecisionTopNPercent(n=100)}) + self.assertIn("PrecisionTop100Percent", p100) + self.assertAlmostEqual(p100["PrecisionTop100Percent"], overall_p["Precision"], places=6) + + # n=70 and not one-to-one (allows multiple matches per column) + p70_n11 = self.matches.get_metrics( + self.ground_truth, metrics={PrecisionTopNPercent(n=70, one_to_one=False)} + ) + self.assertIn("PrecisionTop70Percent", p70_n11) + self.assertAlmostEqual(p70_n11["PrecisionTop70Percent"], 0.75, places=6) + + def test_recall_at_size_of_ground_truth(self) -> None: + r = self.matches.get_metrics(self.ground_truth, metrics={RecallAtSizeofGroundTruth()}) + self.assertIn("RecallAtSizeofGroundTruth", r) + self.assertAlmostEqual(r["RecallAtSizeofGroundTruth"], 0.6, places=6) + + def test_metric_helpers(self) -> None: + limit = 2 + tp, fn = get_tp_fn(self.matches, self.ground_truth, n=limit) + self.assertLessEqual(tp, len(self.ground_truth)) + self.assertLessEqual(fn, len(self.ground_truth)) - precision_50 = self.matches.get_metrics(self.ground_truth, metrics={PrecisionTopNPercent(n=50)}) - assert 'PrecisionTop50Percent' in precision_50 and precision_50['PrecisionTop50Percent'] == 1.0 + fp = get_fp(self.matches, self.ground_truth, n=limit) + self.assertLessEqual(fp, limit) - precision = self.matches.get_metrics(self.ground_truth, metrics={Precision()}) - precision_100 = self.matches.get_metrics(self.ground_truth, metrics={PrecisionTopNPercent(n=100)}) - assert 'PrecisionTop100Percent' in precision_100 and precision_100['PrecisionTop100Percent'] == precision['Precision'] + # With n=2, top-2 predictions should be true matches; GT size is 5 + self.assertEqual(tp, 2) + self.assertEqual(fn, 3) + self.assertEqual(fp, 0) - precision_70_not_one_to_one = self.matches.get_metrics(self.ground_truth, metrics={PrecisionTopNPercent(n=70, one_to_one=False)}) - assert 'PrecisionTop70Percent' in precision_70_not_one_to_one and precision_70_not_one_to_one['PrecisionTop70Percent'] == 0.75 + def test_metric_equals(self) -> None: + a = PrecisionTopNPercent(n=10, one_to_one=False) + b = PrecisionTopNPercent(n=10, one_to_one=False) + c = PrecisionTopNPercent(n=10, one_to_one=True) - def test_recall_at_size_of_ground_truth(self): - recall = self.matches.get_metrics(self.ground_truth, metrics={RecallAtSizeofGroundTruth()}) - assert 'RecallAtSizeofGroundTruth' in recall and recall['RecallAtSizeofGroundTruth'] == 0.6 + self.assertEqual(a, b) + self.assertNotEqual(a, c) + self.assertNotEqual(a, Precision()) - def test_metric_helpers(self): - limit = 2 - tp, fn = get_tp_fn(self.matches, self.ground_truth, n=limit) - assert tp <= len(self.ground_truth) and fn <= len(self.ground_truth) - fp = get_fp(self.matches, self.ground_truth, n=limit) - assert fp <= limit - assert tp == 2 and fn == 3 # Since we limit to 2 of the matches - assert fp == 0 - - def test_metric_equals(self): - assert PrecisionTopNPercent(n=10, one_to_one=False) == PrecisionTopNPercent(n=10, one_to_one=False) - assert PrecisionTopNPercent(n=10, one_to_one=False) != PrecisionTopNPercent(n=10, one_to_one=True) - assert PrecisionTopNPercent(n=10, one_to_one=False) != Precision() +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_sf_algo.py b/tests/test_sf_algo.py new file mode 100644 index 0000000..94f385c --- /dev/null +++ b/tests/test_sf_algo.py @@ -0,0 +1,132 @@ +import unittest +from typing import List +import pandas as pd +import networkx as nx + +from valentine.algorithms.similarity_flooding import ( + graph as sf_graph_mod, + node as sf_node_mod, + node_pair as sf_nodepair_mod, + propagation_graph as sf_prop_mod, + similarity_flooding as sf_sf_mod, +) + +from valentine.data_sources.base_column import BaseColumn +from valentine.data_sources.base_table import BaseTable + + +# ------------------------------ +# Minimal concrete Column & Table +# ------------------------------ +class DummyColumn(BaseColumn): + def __init__(self, uid, name, dtype, data): + self._uid = uid + self._name = name + self._dtype = dtype + self._data = data + + @property + def unique_identifier(self): return self._uid + @property + def name(self): return self._name + @property + def data_type(self): return self._dtype + @property + def data(self): return self._data + + +class DummyTable(BaseTable): + def __init__(self, uid, name, cols: List[BaseColumn]): + self._uid = uid + self._name = name + self._cols = cols + + @property + def unique_identifier(self): return self._uid + @property + def name(self): return self._name + def get_columns(self) -> List[BaseColumn]: return self._cols + def get_df(self) -> pd.DataFrame: return pd.DataFrame({c.name: c.data for c in self._cols}) + @property + def is_empty(self) -> bool: return False + + +class TestGraphNodePropagationAndSF(unittest.TestCase): + + def test_node_equality_and_hash(self): + Node = sf_node_mod.Node + a1 = Node("A", "DB") + a2 = Node("A", "DB") + b = Node("A", "OtherDB") + c = Node("C", "DB") + self.assertTrue(a1 == a2) + self.assertFalse(a1 == b) + self.assertFalse(a1 == c) + # Node.__hash__ uses name only + self.assertEqual(hash(a1), hash(a2)) + self.assertEqual(hash(a1), hash(b)) + self.assertNotEqual(hash(a1), hash(c)) + + def test_nodepair_equality_and_hash(self): + Node = sf_node_mod.Node + NodePair = sf_nodepair_mod.NodePair + n1 = Node("X", "DB") + n2 = Node("Y", "DB") + p1 = NodePair(n1, n2) + p2 = NodePair(n1, n2) + p3 = NodePair(n2, n1) # symmetric equality + self.assertTrue(p1 == p2) + self.assertTrue(p1 == p3) + # Hash stable for identical order (spec is order-insensitive equality, but we check stability) + self.assertEqual(hash(p1), hash(p2)) + + def test_graph_construction_and_type_reuse(self): + # Two int columns -> second should reuse existing type branch; also add a float to create a new type branch + t = DummyTable( + uid="TGUID", + name="T", + cols=[ + DummyColumn(1, "c1", "int", [1, 2]), + DummyColumn(2, "c2", "int", [3, 4]), + DummyColumn(3, "f1", "float", [1.1, 2.2]), + ], + ) + g = sf_graph_mod.Graph(t).graph + self.assertIsInstance(g, nx.DiGraph) + labels = [d.get("label") for *_ , d in g.edges(data=True)] + self.assertIn("name", labels) + self.assertIn("type", labels) + self.assertIn("SQLtype", labels) + + def test_propagation_graph_policies(self): + # Build tiny graphs from two 1-column tables + t1 = DummyTable("SUID", "S", [DummyColumn(1, "A", "int", [1])]) + t2 = DummyTable("TUID", "T", [DummyColumn(2, "B", "int", [2])]) + g1 = sf_graph_mod.Graph(t1).graph + g2 = sf_graph_mod.Graph(t2).graph + + # inverse_average path + pg_avg = sf_prop_mod.PropagationGraph(g1, g2, policy="inverse_average").construct_graph() + self.assertIsInstance(pg_avg, nx.DiGraph) + + # inverse_product path + pg_prod = sf_prop_mod.PropagationGraph(g1, g2, policy="inverse_product").construct_graph() + self.assertIsInstance(pg_prod, nx.DiGraph) + + # unknown policy -> {} + pg_wrong = sf_prop_mod.PropagationGraph(g1, g2, policy="unknown").construct_graph() + self.assertEqual(pg_wrong, {}) + + def test_similarity_flooding_end_to_end(self): + # Two tiny tables; full pipeline executes and returns a dict + t_src = DummyTable("SUID", "S", [DummyColumn(1, "A", "int", [1]), DummyColumn(3, "C", "float", [1.1])]) + t_tgt = DummyTable("TUID", "T", [DummyColumn(2, "B", "int", [2]), DummyColumn(4, "D", "float", [2.2])]) + + sf = sf_sf_mod.SimilarityFlooding(coeff_policy="inverse_average", formula="formula_c") + res = sf.get_matches(t_src, t_tgt) + self.assertIsInstance(res, dict) + # Not asserting content; weights/edges can vary. Ensures no exceptions and correct type. + + +if __name__ == "__main__": + unittest.main() diff --git a/valentine/algorithms/distribution_based/clustering_utils.py b/valentine/algorithms/distribution_based/clustering_utils.py index 77a668f..d6b5cb7 100644 --- a/valentine/algorithms/distribution_based/clustering_utils.py +++ b/valentine/algorithms/distribution_based/clustering_utils.py @@ -2,7 +2,7 @@ import os import subprocess from functools import lru_cache -from typing import List, Tuple +from typing import Any, Dict, Iterable, List, Sequence, Tuple from .column_model import CorrelationClusteringColumn from .emd_utils import intersection_emd, quantile_emd @@ -11,7 +11,7 @@ from ...utils.utils import convert_data_type -def compute_cutoff_threshold(matrix_c: list, threshold: float): +def compute_cutoff_threshold(matrix_c: list, threshold: float) -> float: """ Algorithm 1 of the paper "Automatic Discovery of Attributes in Relational Databases" from M. Zhang et al. [1] This algorithm computes the threshold of a column that determines if any other column is to be considered @@ -30,7 +30,7 @@ def compute_cutoff_threshold(matrix_c: list, threshold: float): The cutoff threshold of the input column """ matrix_c.append({'e': threshold, 'c': 0}) - matrix_c = sorted(matrix_c, key=lambda k: k['e']) + matrix_c.sort(key=lambda k: k['e']) cutoff = 0.0 gap = 0.0 i = 0 @@ -42,10 +42,12 @@ def compute_cutoff_threshold(matrix_c: list, threshold: float): return cutoff -def column_combinations(columns: List[Tuple], - quantiles: int, - tmp_folder_path: str, - intersection: bool = False): +def column_combinations( + columns: List[Tuple[Any, Any, Any, Any]], + quantiles: int, + tmp_folder_path: str, + intersection: bool = False, +) -> Iterable[Tuple[Tuple[Tuple[Any, Any, Any, Any], Tuple[Any, Any, Any, Any]], int, bool, str]]: """ All the unique combinations between all the columns @@ -65,20 +67,22 @@ def column_combinations(columns: List[Tuple], tuple A tuple with ((column_name1, column_name1), quantiles, intersection) """ - c = len(columns) - c_i = 0 - while c_i < c: - _, table_guid_i, _, _ = columns[c_i] - c_j = c_i + 1 - while c_j < c: - _, table_guid_j, _, _ = columns[c_j] - if table_guid_i != table_guid_j: - yield (columns[c_i], columns[c_j]), quantiles, intersection, tmp_folder_path - c_j = c_j + 1 - c_i = c_i + 1 - - -def process_emd(tup: tuple): + from collections import defaultdict + from itertools import product + + groups: Dict[Any, List[Tuple[Any, Any, Any, Any]]] = defaultdict(list) + for item in columns: + _, table_guid, _, _ = item + groups[table_guid].append(item) + + table_guids = list(groups.keys()) + for i, gi in enumerate(table_guids): + for gj in table_guids[i + 1:]: + for ci, cj in product(groups[gi], groups[gj]): + yield (ci, cj), quantiles, intersection, tmp_folder_path + + +def process_emd(tup: tuple) -> Tuple[Tuple[Any, Any], float]: """ Function defining a single quantile_emd process between two columns. @@ -103,8 +107,8 @@ def process_emd(tup: tuple): return k, quantile_emd(c1, c2, quantile) -@lru_cache(maxsize=32) -def read_from_cache(file_name: str, tmp_folder_path): +@lru_cache(maxsize=512) +def read_from_cache(file_name: str, tmp_folder_path: str) -> CorrelationClusteringColumn: """ Function that reads from a pickle file lru cache a column after pre-processing @@ -123,7 +127,7 @@ def read_from_cache(file_name: str, tmp_folder_path): return get_column_from_store(file_name, tmp_folder_path) -def unwrap_process_input_tuple(tup: tuple): +def unwrap_process_input_tuple(tup: tuple) -> Tuple[Tuple[Any, Any, Any, Any], Tuple[Any, Any, Any, Any], Tuple[Any, Any], int, bool, str]: """ Helper function that unwraps a tuple to its components and creates a unique key for the column combination @@ -138,7 +142,7 @@ def unwrap_process_input_tuple(tup: tuple): return name_i, name_j, k, quantile, intersection, tmp_folder_path -def insert_to_dict(dc: dict, k: str, v: dict): +def insert_to_dict(dc: Dict[Any, List[Dict[str, Any]]], k: Any, v: Dict[str, Any]) -> None: """ Helper function that instantiates a list to a dictionary key if it is not present and then appends an EMD/ColumnName pair to it @@ -157,7 +161,7 @@ def insert_to_dict(dc: dict, k: str, v: dict): dc[k].append(v) -def transform_dict(dc: dict): +def transform_dict(dc: Dict[Tuple[Any, Any], float]) -> Dict[Any, List[Dict[str, Any]]]: """ Helper function that transforms a dict with composite column combination keys to a dict with column keys and values EMD/ColumnName pairs in a sorted list (ascending based on the EMD value) @@ -167,17 +171,15 @@ def transform_dict(dc: dict): dc : dict the dictionary """ - tmp_dict = dict() - for k, v in dc.items(): - k1, k2 = k - v1 = {'e': v, 'c': k2} - v2 = {'e': v, 'c': k1} - insert_to_dict(tmp_dict, k1, v1) - insert_to_dict(tmp_dict, k2, v2) + tmp_dict: Dict[Any, List[Dict[str, Any]]] = dict() + append = insert_to_dict + for (k1, k2), v in dc.items(): + append(tmp_dict, k1, {'e': v, 'c': k2}) + append(tmp_dict, k2, {'e': v, 'c': k1}) return tmp_dict -def process_columns(tup: tuple): +def process_columns(tup: tuple) -> None: """ Process a pandas dataframe column to a column_model_scale.Column @@ -187,6 +189,7 @@ def process_columns(tup: tuple): tuple containing the information of the column to be processed """ column_name, column_uid, data, source_name, source_guid, quantiles, tmp_folder_path = tup + os.makedirs(tmp_folder_path, exist_ok=True) column = CorrelationClusteringColumn(column_name, column_uid, data, source_name, source_guid, tmp_folder_path) if column.size > 0: column.quantile_histogram = QuantileHistogram(column.long_name, column.ranks, column.size, quantiles) @@ -197,7 +200,7 @@ def process_columns(tup: tuple): del column -def parallel_cutoff_threshold(tup: tuple): +def parallel_cutoff_threshold(tup: tuple) -> List[Tuple[Tuple[Any, Any, Any, Any], Tuple[Any, Any, Any, Any]]]: """ Process the cutoff threshold in parallel for each column @@ -213,11 +216,13 @@ def parallel_cutoff_threshold(tup: tuple): return n_c -def ingestion_column_generator(columns: List[BaseColumn], - table_name: str, - table_guid: object, - quantiles: int, - tmp_folder_path: str): +def ingestion_column_generator( + columns: Sequence[BaseColumn], + table_name: str, + table_guid: object, + quantiles: int, + tmp_folder_path: str, +) -> Iterable[Tuple[str, object, Sequence[Any], str, object, int, str]]: """ Generator of incoming pandas dataframe columns """ @@ -226,10 +231,12 @@ def ingestion_column_generator(columns: List[BaseColumn], yield column.name, column.unique_identifier, column.data, table_name, table_guid, quantiles, tmp_folder_path -def cuttoff_column_generator(matrix_a: dict, - columns: List[Tuple[str, str, str, str]], - threshold: float, - tmp_folder_path: str): +def cuttoff_column_generator( + matrix_a: dict, + columns: List[Tuple[str, str, str, str]], + threshold: float, + tmp_folder_path: str, +) -> Iterable[Tuple[dict, CorrelationClusteringColumn, float]]: """ Generator of columns for the cutoff threshold computation """ @@ -240,7 +247,7 @@ def cuttoff_column_generator(matrix_a: dict, yield matrix_a, column, threshold -def generate_global_ranks(data: list, tmp_folder_path: str): +def generate_global_ranks(data: list, tmp_folder_path: str) -> None: """ Function that creates a pickle file with the global ranks of all the values inside the database. @@ -251,13 +258,13 @@ def generate_global_ranks(data: list, tmp_folder_path: str): tmp_folder_path: str The path of the temporary folder that will serve as a cache for the run """ + os.makedirs(tmp_folder_path, exist_ok=True) ranks = unix_sort_ranks(set(data), tmp_folder_path) with open(os.path.join(tmp_folder_path, 'ranks.pkl'), 'wb') as output: pickle.dump(ranks, output, pickle.HIGHEST_PROTOCOL) -def unix_sort_ranks(corpus: set, - tmp_folder_path: str): +def unix_sort_ranks(corpus: set, tmp_folder_path: str) -> Dict[Any, int]: """ Function that takes a corpus sorts it with the unix sort -n command and generates the global ranks for each value in the corpus. @@ -281,36 +288,29 @@ def unix_sort_ranks(corpus: set, for var in corpus: print(str(var), file=out) - with open(sorted_file_path, 'w') as f: + with open(sorted_file_path, 'w', encoding='utf-8') as f: if os.name == 'nt': - subprocess.call(['sort', - unsorted_file_path], - stdout=f) + subprocess.run(['sort', unsorted_file_path], stdout=f, check=True) else: sort_env = os.environ.copy() sort_env['LC_ALL'] = 'C' - subprocess.call(['sort', '-n', - unsorted_file_path], - stdout=f, env=sort_env) - - rank = 1 - ranks = [] + subprocess.run(['sort', '-n', unsorted_file_path], stdout=f, env=sort_env, check=True) + ranks: List[Tuple[Any, int]] = [] with open(sorted_file_path, 'r', encoding='utf-8') as f: - txt = f.read() - for var in txt.splitlines(): - ranks.append((convert_data_type(var.replace('\n', '')), rank)) - rank = rank + 1 + for rank, line in enumerate(f, start=1): + ranks.append((convert_data_type(line.rstrip('\n')), rank)) return dict(ranks) -def get_column_from_store(file_name: str, tmp_folder_path: str): +def get_column_from_store(file_name: str, tmp_folder_path: str) -> CorrelationClusteringColumn: file_path = os.path.join(tmp_folder_path, f'{file_name}.pkl') with open(file_path, 'rb') as pkl_file: data = pickle.load(pkl_file) return data -def make_filename_safe(file_name: str): - return "".join([c for c in file_name if c.isalpha() or c.isdigit() or c == ' ']).rstrip() +@lru_cache(maxsize=4096) +def make_filename_safe(file_name: str) -> str: + return "".join(c for c in file_name if c.isalpha() or c.isdigit() or c == ' ').rstrip() diff --git a/valentine/algorithms/similarity_flooding/similarity_flooding.py b/valentine/algorithms/similarity_flooding/similarity_flooding.py index adc9ef5..6d817ee 100644 --- a/valentine/algorithms/similarity_flooding/similarity_flooding.py +++ b/valentine/algorithms/similarity_flooding/similarity_flooding.py @@ -1,7 +1,7 @@ from typing import Dict, Tuple +import math from jellyfish import levenshtein_distance -import math from .graph import Graph from .node_pair import NodePair @@ -13,234 +13,138 @@ class SimilarityFlooding(BaseMatcher): - def __init__(self, coeff_policy='inverse_average', formula='formula_c'): self.__coeff_policy = coeff_policy - # formula used to update similarities of map-pairs as shown in page 10 of the paper self.__formula = formula self.__graph1 = None self.__graph2 = None self.__initial_map = None - def get_matches(self, - source_input: BaseTable, - target_input: BaseTable - ) -> Dict[Tuple[Tuple[str, str], Tuple[str, str]], float]: + def get_matches( + self, + source_input: BaseTable, + target_input: BaseTable + ) -> Dict[Tuple[Tuple[str, str], Tuple[str, str]], float]: self.__graph1 = Graph(source_input).graph self.__graph2 = Graph(target_input).graph self.__calculate_initial_mapping() matches = self.__fixpoint_computation(100, 1e-4) - filtered_matches = self.__filter_map(matches) - return self.__format_output(filtered_matches) def __calculate_initial_mapping(self): - self.__initial_map = {} - + init_map = {} for n1 in self.__graph1.nodes(): + n1_name = n1.name for n2 in self.__graph2.nodes(): - if n1.name[0:6] == "NodeID" or n2.name[0:6] == "NodeID": - self.__initial_map[NodePair(n1, n2)] = 0.0 + n2_name = n2.name + if n1_name.startswith("NodeID") or n2_name.startswith("NodeID"): + sim = 0.0 else: - similarity = normalize_distance(levenshtein_distance(n1.name, n2.name), n1.name, n2.name) - self.__initial_map[NodePair(n1, n2)] = similarity + sim = normalize_distance( + levenshtein_distance(n1_name, n2_name), n1_name, n2_name + ) + init_map[NodePair(n1, n2)] = sim + self.__initial_map = init_map @staticmethod def __get_euc_residual_vector(previous_map, next_map): - # residual vector - residual_vector = {key: math.pow(previous_map.get(key, 0) - next_map.get(key, 0), 2) - for key in set(previous_map) | set(next_map)} - # compute Euclidean length of residual vector - return math.sqrt(sum(residual_vector.values())) + keys = set(previous_map) | set(next_map) + return math.sqrt( + sum((previous_map.get(k, 0) - next_map.get(k, 0)) ** 2 for k in keys) + ) def __get_next_map(self, previous_map, p_graph, formula): - next_map = dict() - - max_map = 0 + next_map = {} + max_val = 0 + init_map = self.__initial_map for n in p_graph.nodes(): if formula == 'formula_a': - map_sim = self.__initial_map[n] + map_sim = init_map[n] elif formula == 'formula_b': map_sim = 0 else: map_sim = previous_map[n] - for e in p_graph.in_edges(n): - edge_data = p_graph.get_edge_data(e[0], e[1]) - - weight = edge_data.get('weight') - - if formula == 'formula_a' or formula == 'basic': - map_sim += weight * previous_map[e[0]] + w = p_graph.get_edge_data(e[0], e[1]).get('weight') + if formula in ('formula_a', 'basic'): + map_sim += w * previous_map[e[0]] elif formula == 'formula_b': - map_sim += weight * self.__initial_map[e[0]] + map_sim += w * init_map[e[0]] else: - map_sim += self.__initial_map[e[0]] + weight * (previous_map[e[0]] + self.__initial_map[e[0]]) - - if map_sim > max_map: - max_map = map_sim - + map_sim += init_map[e[0]] + w * (previous_map[e[0]] + init_map[e[0]]) + if map_sim > max_val: + max_val = map_sim next_map[n] = map_sim - for key in next_map: - next_map[key] = next_map[key] / max_map - + inv_max = 1.0 / max_val if max_val > 0 else 1.0 + for k in next_map: + next_map[k] *= inv_max return next_map def __fixpoint_computation(self, num_iter, residual_diff): + p_g = PropagationGraph(self.__graph1, self.__graph2, self.__coeff_policy).construct_graph() - p_g_builder = PropagationGraph(self.__graph1, self.__graph2, self.__coeff_policy) - - p_g = p_g_builder.construct_graph() - - if self.__formula == 'basic': - # using the basing formula - - previous_map = self.__initial_map.copy() - - for _ in range(0, num_iter): - next_map = self.__get_next_map(previous_map, p_g, self.__formula) - - euc_len = self.__get_euc_residual_vector(previous_map, next_map) - - if euc_len <= residual_diff: - # check whether the algo has converged - break - - previous_map = next_map.copy() - - elif self.__formula == 'formula_a': - # using formula A - previous_map = self.__initial_map.copy() - - for _ in range(0, num_iter): - next_map = self.__get_next_map(previous_map, p_g, self.__formula) - - euc_len = self.__get_euc_residual_vector(previous_map, next_map) - - if euc_len <= residual_diff: # check whether the algo has converged - break - - previous_map = next_map.copy() - - elif self.__formula == 'formula_b': - # using formula B - next_map = self.__get_next_map(None, p_g, self.__formula) - previous_map = next_map.copy() - - for _ in range(0, num_iter-1): - next_map = self.__get_next_map(previous_map, p_g, self.__formula) - - euc_len = self.__get_euc_residual_vector(previous_map, next_map) - - if euc_len <= residual_diff: - # check whether the algo has converged - break - + def iterate(previous_map, formula, iters): + for _ in range(iters): + next_map = self.__get_next_map(previous_map, p_g, formula) + if self.__get_euc_residual_vector(previous_map, next_map) <= residual_diff: + return next_map previous_map = next_map.copy() + return previous_map - elif self.__formula == 'formula_c': - # using formula C which is claimed to be the best one - previous_map = self.__initial_map.copy() - next_map = self.__get_next_map(previous_map, p_g, 'formula_b') - previous_map = next_map.copy() - - for _ in range(0, num_iter-1): - next_map = self.__get_next_map(previous_map, p_g, self.__formula) + if self.__formula == 'basic': + return iterate(self.__initial_map.copy(), self.__formula, num_iter) - euc_len = self.__get_euc_residual_vector(previous_map, next_map) + if self.__formula == 'formula_a': + return iterate(self.__initial_map.copy(), self.__formula, num_iter) - if euc_len <= residual_diff: - # check whether the algo has converged - break + if self.__formula == 'formula_b': + first = self.__get_next_map(None, p_g, self.__formula) + return iterate(first.copy(), self.__formula, num_iter - 1) - previous_map = next_map.copy() + if self.__formula == 'formula_c': + start = self.__get_next_map(self.__initial_map.copy(), p_g, 'formula_b') + return iterate(start.copy(), self.__formula, num_iter - 1) - else: - print("Wrong formula option!") - return {} - # the dictionary storing the final similarities of map pairs - return previous_map + print("Wrong formula option!") + return {} def __filter_map(self, prev_map): - - """ - Function that filters the matching results, so that only pairs of columns remain - :param prev_map: the matching results of the iterative algorithm - :return: the filtered matchings - """ - - filtered_map = prev_map.copy() - - for key in prev_map: - - flag = False - if key.node1.name[0:6] == 'NodeID': - - if key.node1 in self.__graph1.nodes(): - - for e in self.__graph1.out_edges(key.node1): - - if e[1].name == 'Column': - flag = True - - break - else: - - for e in self.__graph2.out_edges(key.node1): - - if e[1].name == 'Column': - flag = True - - break - else: - - del filtered_map[key] + filtered = prev_map.copy() + g1_nodes = self.__graph1.nodes() + g1_out_edges = self.__graph1.out_edges + g2_out_edges = self.__graph2.out_edges + + for key in prev_map.keys(): + n1 = key.node1 + n2 = key.node2 + if not n1.name.startswith('NodeID'): + filtered.pop(key, None) continue - if flag: - - flag = False - - if key.node2.name[0:6] == 'NodeID': - - if key.node2 in self.__graph1.nodes(): - - for e in self.__graph1.out_edges(key.node2): - - if e[1].name == 'Column': - flag = True - - break - else: - for e in self.__graph2.out_edges(key.node2): - - if e[1].name == 'Column': - flag = True - - break - else: - - del filtered_map[key] + edges = g1_out_edges(n1) if n1 in g1_nodes else g2_out_edges(n1) + if not any(e[1].name == 'Column' for e in edges): + filtered.pop(key, None) continue - if not flag: + if not n2.name.startswith('NodeID'): + filtered.pop(key, None) + continue - del filtered_map[key] + edges = g1_out_edges(n2) if n2 in g1_nodes else g2_out_edges(n2) + if not any(e[1].name == 'Column' for e in edges): + filtered.pop(key, None) - return filtered_map + return filtered def __format_output(self, matches) -> Dict[Tuple[Tuple[str, str], Tuple[str, str]], float]: output = {} - sorted_maps = {k: v for k, v in sorted(matches.items(), key=lambda item: -item[1])} - for key in sorted_maps.keys(): + sorted_maps = sorted(matches.items(), key=lambda item: -item[1]) + for key, sim in sorted_maps: s_long_name, t_long_name = self.__get_node_name(key) - similarity = sorted_maps[key] s_t_name, _, s_c_name, _ = s_long_name t_t_name, _, t_c_name, _ = t_long_name - match = Match(t_t_name, t_c_name, - s_t_name, s_c_name, - float(similarity)) + match = Match(t_t_name, t_c_name, s_t_name, s_c_name, float(sim)) output.update(match.to_dict) return output @@ -248,15 +152,16 @@ def __get_node_name(self, key): return self.__get_attribute_tuple(key.node1), self.__get_attribute_tuple(key.node2) def __get_attribute_tuple(self, node): - column_name = None - if node in self.__graph1.nodes(): - for e in self.__graph1.out_edges(node): - links = self.__graph1.get_edge_data(e[0], e[1]) - if links.get('label') == "name": - column_name = e[1].long_name + g1_nodes = self.__graph1.nodes() + g1_out_edges = self.__graph1.out_edges + g2_out_edges = self.__graph2.out_edges + if node in g1_nodes: + edges = g1_out_edges(node) + get_data = self.__graph1.get_edge_data else: - for e in self.__graph2.out_edges(node): - links = self.__graph2.get_edge_data(e[0], e[1]) - if links.get('label') == "name": - column_name = e[1].long_name - return column_name + edges = g2_out_edges(node) + get_data = self.__graph2.get_edge_data + for e in edges: + if get_data(e[0], e[1]).get('label') == "name": + return e[1].long_name + return None diff --git a/valentine/metrics/__init__.py b/valentine/metrics/__init__.py index b05135d..a81ddbb 100644 --- a/valentine/metrics/__init__.py +++ b/valentine/metrics/__init__.py @@ -1,5 +1,5 @@ -from valentine.metrics.base_metric import Metric -from .metrics import * +from .base_metric import Metric +from .metrics import Precision, Recall, F1Score, PrecisionTopNPercent, RecallAtSizeofGroundTruth # Some predefined sets of metrics METRICS_ALL = {metric() for metric in Metric.__subclasses__()} # Note: will also catch newly defined metrics diff --git a/valentine/metrics/metrics.py b/valentine/metrics/metrics.py index b184842..b458aa6 100644 --- a/valentine/metrics/metrics.py +++ b/valentine/metrics/metrics.py @@ -1,128 +1,143 @@ -"""Here one can find some common metric implementations. Custom metrics can be -made by subclassing the `Metric` ABC. Marking them with the dataclass decorator -allows for proper hashing/equals without the boilerplate. +"""Common metric implementations for Valentine. + +Custom metrics can be created by subclassing :class:`Metric`. We use ``@dataclass`` +with ``frozen=True`` so instances are hashable and comparable without boilerplate. """ -from .base_metric import Metric -from .metric_helpers import * + +from __future__ import annotations + from dataclasses import dataclass +from typing import Any, Sequence, Tuple + +from .base_metric import Metric +from .metric_helpers import get_fp, get_tp_fn + +# Public exports +__all__ = [ + "Precision", + "Recall", + "F1Score", + "PrecisionTopNPercent", + "RecallAtSizeofGroundTruth", +] + +# Ground-truth is expressed as pairs of (left_col_name, right_col_name) +GroundTruth = Sequence[Tuple[str, str]] + + +def _safe_div(numerator: float, denominator: float) -> float: + """Return numerator/denominator, guarding against division by zero.""" + return numerator / denominator if denominator else 0.0 @dataclass(eq=True, frozen=True) class Precision(Metric): - """Metric for calculating precision. + """Precision = TP / (TP + FP). Attributes ---------- one_to_one : bool - Whether to apply the one-to-one filter to the MatcherResults first. + Whether to apply the one-to-one filter to the MatcherResults first. """ one_to_one: bool = True - def apply(self, matches, ground_truth): + def apply(self, matches: Any, ground_truth: GroundTruth) -> dict[str, float]: if self.one_to_one: matches = matches.one_to_one() tp, _ = get_tp_fn(matches, ground_truth) fp = get_fp(matches, ground_truth) - precision = 0 - if tp + fp > 0: - precision = tp / (tp + fp) - + precision = _safe_div(tp, tp + fp) return self.return_format(precision) @dataclass(eq=True, frozen=True) class Recall(Metric): - """Metric for calculating recall. + """Recall = TP / (TP + FN). Attributes ---------- one_to_one : bool - Whether to apply the one-to-one filter to the MatcherResults first. + Whether to apply the one-to-one filter to the MatcherResults first. """ one_to_one: bool = True - def apply(self, matches, ground_truth): + def apply(self, matches: Any, ground_truth: GroundTruth) -> dict[str, float]: if self.one_to_one: matches = matches.one_to_one() tp, fn = get_tp_fn(matches, ground_truth) - recall = 0 - if tp + fn > 0: - recall = tp / (tp + fn) - + recall = _safe_div(tp, tp + fn) return self.return_format(recall) @dataclass(eq=True, frozen=True) class F1Score(Metric): - """Metric for calculating f1 score. + """F1 score = 2 * (Precision * Recall) / (Precision + Recall). Attributes ---------- one_to_one : bool - Whether to apply the one-to-one filter to the MatcherResults first. + Whether to apply the one-to-one filter to the MatcherResults first. """ one_to_one: bool = True - def apply(self, matches, ground_truth): + def apply(self, matches: Any, ground_truth: GroundTruth) -> dict[str, float]: if self.one_to_one: matches = matches.one_to_one() tp, fn = get_tp_fn(matches, ground_truth) fp = get_fp(matches, ground_truth) - f1 = 0 - if tp > 0: - pr = tp / (tp + fp) - re = tp / (tp + fn) - f1 = 2 * ((pr * re) / (pr + re)) + precision = _safe_div(tp, tp + fp) + recall = _safe_div(tp, tp + fn) + f1 = _safe_div(2.0 * (precision * recall), (precision + recall)) return self.return_format(f1) @dataclass(eq=True, frozen=True) class PrecisionTopNPercent(Metric): - """Metric for calculating precision of the top N percent of matches. + """Precision restricted to the top-N% of predicted matches by score. Attributes ---------- one_to_one : bool - Whether to apply the one-to-one filter to the MatcherResults first. + Whether to apply the one-to-one filter to the MatcherResults first. n : int - The percent of matches to consider. + Percentage of matches to consider (0–100). """ one_to_one: bool = True n: int = 10 - def name(self): - return super().name().replace('N', str(self.n)) + def name(self) -> str: + # Replace the 'N' in the base name with the chosen percent, e.g. "PrecisionTop70Percent". + return super().name().replace("N", str(self.n)) - def apply(self, matches, ground_truth): + def apply(self, matches: Any, ground_truth: GroundTruth) -> dict[str, float]: if self.one_to_one: matches = matches.one_to_one() - n_matches = matches.take_top_percent(self.n) + # Clamp N to a sensible range without mutating the dataclass. + n_clamped = min(100, max(0, int(self.n))) + n_matches = matches.take_top_percent(n_clamped) tp, _ = get_tp_fn(n_matches, ground_truth) fp = get_fp(n_matches, ground_truth) - precision_top_n_percent = 0 - if tp + fp > 0: - precision_top_n_percent = tp / (tp + fp) - + precision_top_n_percent = _safe_div(tp, tp + fp) return self.return_format(precision_top_n_percent) @dataclass(eq=True, frozen=True) class RecallAtSizeofGroundTruth(Metric): - """Metric for calculating recall at the size of the ground truth. + """Recall when considering the top-|GT| predictions. + + This simulates selecting as many predictions as there are gold pairs and + computing recall at that cutoff: TP / (TP + FN) where the candidate set is + the top-``len(ground_truth)`` matches by score. """ - def apply(self, matches, ground_truth): + def apply(self, matches: Any, ground_truth: GroundTruth) -> dict[str, float]: n_matches = matches.take_top_n(len(ground_truth)) - tp, fn = get_tp_fn(n_matches, ground_truth) - recall = 0 - if tp + fn > 0: - recall = tp / (tp + fn) - + recall = _safe_div(tp, tp + fn) return self.return_format(recall)