diff --git a/Datasets/PrescriptiveAnalysis1/SPADE/example2.csv b/Datasets/PrescriptiveAnalysis1/SPADE/example2.csv new file mode 100644 index 0000000..c20adc2 --- /dev/null +++ b/Datasets/PrescriptiveAnalysis1/SPADE/example2.csv @@ -0,0 +1,11 @@ +NAME,INVOICEDATE,PRODUCTNAME +1,1/1/2025,"C,D" +1,1/3/2025,"A,B,C" +1,1/4/2025,"A,B,F" +1,1/4/2025,"A,C,D,F" +2,1/1/2025,"A,B,F" +2,1/1/2025,E +3,1/1/2025,"A,B,F" +4,1/2/2025,"D,H,G" +4,1/2/2025,B +4,1/3/2025,"A,G,H" diff --git a/Datasets/PrescriptiveAnalysis1/SPADE/groceries_own .csv b/Datasets/PrescriptiveAnalysis1/SPADE/groceries_own .csv new file mode 100644 index 0000000..e0d8529 --- /dev/null +++ b/Datasets/PrescriptiveAnalysis1/SPADE/groceries_own .csv @@ -0,0 +1,31 @@ +NAME,INVOICEDATE,PRODUCTNAME +1,01-01-2025,Milk +1,01-01-2025,Bread +1,01-02-2025,Eggs +1,01-02-2025,Apples +1,01-03-2025,Bananas +1,01-03-2025,Orange Juice +1,01-04-2025,Cereal +2,01-01-2025,Butter +2,01-01-2025,Cheese +2,01-02-2025,Yogurt +2,01-02-2025,Chicken +2,01-03-2025,Beef +2,01-03-2025,Pasta +3,01-01-2025,Tomato Sauce +3,01-01-2025,Olive Oil +3,01-02-2025,Rice +3,01-02-2025,Potatoes +3,01-03-2025,Carrots +3,01-03-2025,Broccoli +4,01-01-2025,Toothpaste +4,01-01-2025,Milk +4,01-02-2025,Bread +4,01-02-2025,Eggs +4,01-03-2025,Apples +4,01-03-2025,Bananas +5,01-01-2025,Orange Juice +5,01-01-2025,Cereal +5,01-02-2025,Butter +5,01-02-2025,Cheese +5,01-03-2025,Yogurt \ No newline at end of file diff --git a/src/PrescriptiveAnalysis1/Backend/readme.txt b/src/PrescriptiveAnalysis1/Backend/readme.txt index 8a0a1b6..a76590c 100644 --- a/src/PrescriptiveAnalysis1/Backend/readme.txt +++ b/src/PrescriptiveAnalysis1/Backend/readme.txt @@ -117,3 +117,21 @@ File: "groceries_own.csv" Random Dataset Min Support = 0.2 / 0.3 / 0.4 ---------------------------------------------------------------------------------------------------- + + + +-SPADE +---------------------------------------------------------------------------------------------------- +File: "example2.csv" +Example Question given in Sir PPT +Min Support = 0.5 +(Answer cross-checked) + +File: "groceries_own.csv" +Random Dataset (same as used for GSP) +Min Support = 0.3 +(Answer cross-checked) + +The file must have columns named: +"INVOICENO", "PRODUCTNAME" +---------------------------------------------------------------------------------------------------- diff --git a/src/PrescriptiveAnalysis1/Backend/spade.py b/src/PrescriptiveAnalysis1/Backend/spade.py new file mode 100644 index 0000000..0c58709 --- /dev/null +++ b/src/PrescriptiveAnalysis1/Backend/spade.py @@ -0,0 +1,446 @@ +import pandas as pd +from collections import defaultdict +import traceback + +def preprocess_data_vertical(df): + """ + Convert horizontal data format to vertical format (SID, EID, item). + SID = Sequence ID (from NAME column) + EID = Event ID (timestamp/order of events) + """ + try: + if 'NAME' not in df.columns: + return None, "Error: NAME column missing in dataset" + df = df.copy() + df['SID'] = df['NAME'].astype(str) + + if df['SID'].isnull().any(): + return None, "Error: Invalid or missing NAME values" + + if 'INVOICEDATE' in df.columns: + try: + df['INVOICEDATE'] = pd.to_datetime(df['INVOICEDATE'], errors='coerce') + except: + df['INVOICEDATE'] = pd.to_datetime(df['INVOICEDATE'], errors='coerce', dayfirst=True) + df_sorted = df.sort_values(['SID', 'INVOICEDATE']) + else: + df_sorted = df.sort_values(['SID']) + + df_sorted['EID'] = df_sorted.groupby('SID').cumcount() + 1 + + vertical_format = [] + for _, row in df_sorted.iterrows(): + if isinstance(row['PRODUCTNAME'], str) and ',' in row['PRODUCTNAME']: + for item in row['PRODUCTNAME'].split(','): + vertical_format.append({ + 'SID': row['SID'], + 'EID': row['EID'], + 'item': item.strip() + }) + else: + vertical_format.append({ + 'SID': row['SID'], + 'EID': row['EID'], + 'item': str(row['PRODUCTNAME']).strip() + }) + + return pd.DataFrame(vertical_format), None + except Exception as e: + return None, f"Error in preprocessing data: {str(e)}" + +def get_transaction_table(vertical_df): + """ + Create a transaction table by grouping items by SID and EID. + """ + try: + transactions = vertical_df.groupby(['SID', 'EID'])['item'].apply(lambda x: set(x)).reset_index() + transactions.columns = ['Customer ID (SID)', 'Event ID (EID)', 'Items'] + return transactions, None + except Exception as e: + return None, f"Error in creating transaction table: {str(e)}" + +def create_idlists(vertical_df): + """Create ID-lists for each item (item, SID, EID).""" + try: + idlists = defaultdict(list) + for _, row in vertical_df.iterrows(): + idlists[row['item']].append((row['SID'], row['EID'])) + return idlists, None + except Exception as e: + return None, f"Error in creating ID-lists: {str(e)}" + +def calculate_support(pattern, transactions_df): + """ + Calculate support by checking pattern in transaction table. + Support = (number of SIDs containing pattern) / (total SIDs) + """ + try: + total_sids = transactions_df['Customer ID (SID)'].nunique() + if total_sids == 0: + return 0 + + matching_sids = set() + grouped = transactions_df.groupby('Customer ID (SID)') + + if isinstance(pattern, frozenset): + pattern_items = set(pattern) + for sid, group in grouped: + for _, row in group.iterrows(): + if pattern_items.issubset(row['Items']): + matching_sids.add(sid) + break + elif isinstance(pattern, tuple): + for sid, group in grouped: + group = group.sort_values('Event ID (EID)') + found = [False] * len(pattern) + current_pos = 0 + for _, row in group.iterrows(): + items = row['Items'] + if current_pos < len(pattern): + current_element = pattern[current_pos] + element_items = set(current_element) if isinstance(current_element, frozenset) else {current_element} + if element_items.issubset(items): + found[current_pos] = True + current_pos += 1 + if all(found): + matching_sids.add(sid) + + return len(matching_sids) / total_sids if total_sids > 0 else 0 + except Exception as e: + return 0 + +def generate_1_sequences(transactions_df, min_support): + """Generate frequent 1-sequences using transaction table.""" + try: + unique_items = set() + for items in transactions_df['Items']: + unique_items.update(items) + + frequent_1_sequences = [] + for item in unique_items: + pattern = frozenset([item]) + support = calculate_support(pattern, transactions_df) + if support >= min_support: + frequent_1_sequences.append((pattern, support * transactions_df['Customer ID (SID)'].nunique())) + return frequent_1_sequences, None + except Exception as e: + return None, f"Error in generating 1-sequences: {str(e)}" + +def join_idlists(idlist1=None, idlist2=None, join_type='temporal', first_itemset=None, second_itemset=None, idlists=None): + """ + Join ID-lists based on join type: + - 'temporal': sequence extension (different events) + - 'itemset': itemset extension (same event) + - 'sequence_itemset': sequence -> itemset or itemset -> itemset + """ + result = [] + + if join_type == 'sequence_itemset' and first_itemset is not None and second_itemset is not None and idlists is not None: + first_items = sorted(list(first_itemset)) if isinstance(first_itemset, (frozenset, set)) else [first_itemset] + second_items = sorted(list(second_itemset)) if isinstance(second_itemset, (frozenset, set)) else [second_itemset] + + first_idlist = [(sid, eid) for sid, eid in idlists[first_items[0]]] + for item in first_items[1:]: + next_idlist = [(sid, eid) for sid, eid in idlists[item]] + first_idlist = [(sid, eid) for sid, eid in first_idlist if (sid, eid) in next_idlist] + + second_idlist = [(sid, eid) for sid, eid in idlists[second_items[0]]] + for item in second_items[1:]: + next_idlist = [(sid, eid) for sid, eid in idlists[item]] + second_idlist = [(sid, eid) for sid, eid in second_idlist if (sid, eid) in next_idlist] + + first_by_sid = defaultdict(list) + for sid, eid in sorted(first_idlist, key=lambda x: (x[0], x[1])): + first_by_sid[sid].append(eid) + + sid_added = set() + for sid, eid2 in sorted(second_idlist, key=lambda x: (x[0], x[1])): + if sid in first_by_sid: + for eid1 in first_by_sid[sid]: + if eid2 > eid1 and sid not in sid_added: + result.append((sid, eid2)) + sid_added.add(sid) + break + elif join_type == 'temporal': + first_by_sid = defaultdict(list) + for sid, eid in sorted(idlist1, key=lambda x: (x[0], x[1])): + first_by_sid[sid].append(eid) + sid_added = set() + for sid, eid2 in sorted(idlist2, key=lambda x: (x[0], x[1])): + if sid in first_by_sid and sid not in sid_added: + for eid1 in first_by_sid[sid]: + if eid2 > eid1: + result.append((sid, eid2)) + sid_added.add(sid) + break + elif join_type == 'itemset': + sid_eid_set = set(idlist2) + for sid, eid in idlist1: + if (sid, eid) in sid_eid_set: + result.append((sid, eid)) + + return result + +def generate_candidate_k_sequences(frequent_sequences_k_minus_1, k, idlists, transactions_df): + """Generate candidate k-sequences from frequent (k-1)-sequences.""" + try: + candidates = [] + seen_itemsets = set() + seen_sequences = set() + + itemsets = [(p, s) for p, s in frequent_sequences_k_minus_1 if isinstance(p, frozenset)] + sequences = [(p, s) for p, s in frequent_sequences_k_minus_1 if isinstance(p, tuple)] + + # Collect single frequent items + single_items = [] + for p, _ in frequent_sequences_k_minus_1: + if isinstance(p, frozenset) and len(p) == 1: + single_items.append(list(p)[0]) + + if k == 2: + items = [seq for seq, _ in frequent_sequences_k_minus_1] + for i, item_i in enumerate(items): + for j, item_j in enumerate(items[i+1:], start=i+1): + item_i_str = list(item_i)[0] + item_j_str = list(item_j)[0] + if item_i_str == item_j_str: + continue + + idlist_i = idlists[item_i_str] + idlist_j = idlists[item_j_str] + + itemset_tuple = tuple(sorted([item_i_str, item_j_str])) + if itemset_tuple not in seen_itemsets: + new_itemset = frozenset(itemset_tuple) + new_idlist = join_idlists(idlist_i, idlist_j, join_type='itemset') + if new_idlist: + candidates.append((new_itemset, new_idlist)) + seen_itemsets.add(itemset_tuple) + + new_sequence = (item_i_str, item_j_str) + new_idlist = join_idlists(idlist_i, idlist_j, join_type='temporal') + if new_sequence not in seen_sequences and new_idlist: + candidates.append((new_sequence, new_idlist)) + seen_sequences.add(new_sequence) + + new_sequence = (item_j_str, item_i_str) + new_idlist = join_idlists(idlist_j, idlist_i, join_type='temporal') + if new_sequence not in seen_sequences and new_idlist: + candidates.append((new_sequence, new_idlist)) + seen_sequences.add(new_sequence) + else: + # Itemset joins + for i, (itemset_i, _) in enumerate(itemsets): + for j, (itemset_j, _) in enumerate(itemsets[i+1:], start=i+1): + items_i = sorted(list(itemset_i)) + items_j = sorted(list(itemset_j)) + if items_i[:-1] == items_j[:-1]: + new_items = sorted(list(itemset_i) + [items_j[-1]]) + new_itemset = frozenset(new_items) + itemset_tuple = tuple(new_items) + if itemset_tuple not in seen_itemsets: + new_idlist = join_idlists(idlists[items_i[0]], idlists[items_j[-1]], join_type='itemset') + for item in new_items[1:-1]: + next_idlist = idlists[item] + new_idlist = [(sid, eid) for sid, eid in new_idlist if (sid, eid) in next_idlist] + if new_idlist: + candidates.append((new_itemset, new_idlist)) + seen_itemsets.add(itemset_tuple) + + # Sequence joins + for i, (seq_i, _) in enumerate(sequences): + for j, (seq_j, _) in enumerate(sequences): + if i == j: + continue + if seq_i[:-1] == seq_j[:-1]: + new_sequence = seq_i + (seq_j[-1],) + if new_sequence not in seen_sequences: + last_item_i = seq_i[-1] if isinstance(seq_i[-1], str) else sorted(seq_i[-1])[0] + last_item_j = seq_j[-1] if isinstance(seq_j[-1], str) else sorted(seq_j[-1])[0] + new_idlist = join_idlists(idlists[last_item_i], idlists[last_item_j], join_type='temporal') + if new_idlist: + candidates.append((new_sequence, new_idlist)) + seen_sequences.add(new_sequence) + + # Sequence -> Itemset + for seq, _ in sequences: + last_seq_element = seq[-1] + last_items = [last_seq_element] if isinstance(last_seq_element, str) else sorted(last_seq_element) + for itemset, _ in itemsets: + if len(seq) == 1: + new_sequence = (last_seq_element, itemset) + else: + new_sequence = seq[:-1] + (itemset,) + sequence_tuple = new_sequence + if sequence_tuple not in seen_sequences: + new_idlist = join_idlists( + idlists[last_items[0]], None, + join_type='sequence_itemset', + first_itemset=frozenset(last_items), + second_itemset=itemset, + idlists=idlists + ) + if new_idlist: + candidates.append((new_sequence, new_idlist)) + seen_sequences.add(sequence_tuple) + + # Itemset -> Sequence + for itemset, _ in itemsets: + itemset_items = sorted(itemset) + for seq, _ in sequences: + first_seq_element = seq[0] + first_items = [first_seq_element] if isinstance(first_seq_element, str) else sorted(first_seq_element) + new_sequence = (itemset,) + seq[1:] + sequence_tuple = new_sequence + if sequence_tuple not in seen_sequences: + new_idlist = join_idlists( + idlists[itemset_items[0]], None, + join_type='sequence_itemset', + first_itemset=frozenset(itemset_items), + second_itemset=frozenset(first_items), + idlists=idlists + ) + if new_idlist: + candidates.append((new_sequence, new_idlist)) + seen_sequences.add(sequence_tuple) + + # Itemset -> Single Item + if k == 3: + for itemset, _ in itemsets: + if len(itemset) >= 2: + itemset_items = sorted(itemset) + for single_item in single_items: + new_sequence = (itemset, single_item) + sequence_tuple = new_sequence + if sequence_tuple not in seen_sequences: + new_idlist = join_idlists( + idlists[itemset_items[0]], None, + join_type='sequence_itemset', + first_itemset=frozenset(itemset_items), + second_itemset=frozenset([single_item]), + idlists=idlists + ) + if new_idlist: + candidates.append((new_sequence, new_idlist)) + seen_sequences.add(sequence_tuple) + + return candidates, None + except Exception as e: + return None, f"Error in generating candidate {k}-sequences: {str(e)}" + +def filter_frequent_sequences(candidates, min_support, transactions_df): + """Filter candidates to get frequent sequences using transaction table.""" + try: + frequent_sequences = [] + seen_patterns = set() + for pattern, idlist in candidates: + support = calculate_support(pattern, transactions_df) + if support >= min_support: + pattern_key = pattern if isinstance(pattern, tuple) else tuple(sorted(pattern)) + if pattern_key not in seen_patterns: + frequent_sequences.append((pattern, support * transactions_df['Customer ID (SID)'].nunique())) + seen_patterns.add(pattern_key) + return frequent_sequences, None + except Exception as e: + return None, f"Error in filtering frequent sequences: {str(e)}" + +def format_pattern(pattern): + """Format a pattern for readability.""" + if isinstance(pattern, frozenset): + return f"{{{', '.join(sorted(pattern))}}}" + elif isinstance(pattern, tuple): + return f"<{' -> '.join([format_pattern(p) if isinstance(p, frozenset) else p for p in pattern])}>" + return str(pattern) + +def get_pattern_length(pattern): + """Get length of a pattern (number of items).""" + if isinstance(pattern, frozenset): + return len(pattern) + elif isinstance(pattern, tuple): + return sum(1 if isinstance(p, str) else len(p) for p in pattern) + return 1 + +def run_spade_analysis(df, min_support): + """ + Main SPADE algorithm implementation with enhanced output. + Returns: transactions_df, detailed_results, all_frequent_df, error + """ + try: + vertical_df, error = preprocess_data_vertical(df) + if error: + return None, None, None, error + + transactions_df, error = get_transaction_table(vertical_df) + if error: + return None, None, None, error + + idlists, error = create_idlists(vertical_df) + if error: + return None, None, None, error + + frequent_1, error = generate_1_sequences(transactions_df, min_support) + if error: + return None, None, None, error + + frequent_1_df = pd.DataFrame([ + (format_pattern(seq), support) + for seq, support in sorted(frequent_1, key=lambda x: str(x[0])) + ], columns=["Pattern", "Support"]) + + all_frequent = list(frequent_1) + all_frequent_by_level = {1: frequent_1} + + detailed_results = { + "vertical_format_sample": vertical_df, + "transactions": transactions_df, + "total_sequences": transactions_df['Customer ID (SID)'].nunique(), + "min_support": min_support, + "frequent_1": frequent_1_df, + "candidates": [], + "frequent": [] + } + + k = 2 + while True: + candidates_k, error = generate_candidate_k_sequences(all_frequent_by_level.get(k-1, []), k, idlists, transactions_df) + if error: + return None, None, None, error + + if not candidates_k: + break + + candidates_df = pd.DataFrame([ + (format_pattern(seq), len(set(sid for sid, _ in idlist))) + for seq, idlist in sorted(candidates_k, key=lambda x: str(x[0])) + ], columns=["Pattern", "ID-List Length"]) + detailed_results["candidates"].append((k, candidates_df)) + + frequent_k, error = filter_frequent_sequences(candidates_k, min_support, transactions_df) + if error: + return None, None, None, error + + if not frequent_k: + break + + all_frequent_by_level[k] = frequent_k + frequent_k_df = pd.DataFrame([ + (format_pattern(seq), support) + for seq, support in sorted(frequent_k, key=lambda x: str(x[0])) + ], columns=["Pattern", "Support"]) + detailed_results["frequent"].append((k, frequent_k_df)) + + all_frequent.extend(frequent_k) + k += 1 + + all_frequent_df = pd.DataFrame( + [(format_pattern(seq), support, "Itemset" if isinstance(seq, frozenset) else "Sequence", get_pattern_length(seq)) + for seq, support in sorted(all_frequent, key=lambda x: (get_pattern_length(x[0]), isinstance(x[0], frozenset), str(x[0])))], + columns=["Pattern", "Support", "Pattern Type", "Length"] + ) + + detailed_results["all_frequent"] = all_frequent_df + return transactions_df, detailed_results, all_frequent_df, None + + except Exception as e: + error_msg = f"Error in SPADE analysis: {str(e)}" + return None, None, None, error_msg \ No newline at end of file diff --git a/src/PrescriptiveAnalysis1/Frontend/main.py b/src/PrescriptiveAnalysis1/Frontend/main.py index ef5a9e3..29bcab5 100644 --- a/src/PrescriptiveAnalysis1/Frontend/main.py +++ b/src/PrescriptiveAnalysis1/Frontend/main.py @@ -10,6 +10,7 @@ from ..Backend.gsp import preprocess_sequences_ordered, gsp_algorithm from ..Backend.apriori import run_apriori_analysis from ..Backend.fp_growth import run_fp_growth_analysis +from ..Backend.spade import preprocess_data_vertical, get_transaction_table, run_spade_analysis, format_pattern, get_pattern_length def apriori_graph_mining_app(): st.title("Apriori-Based Graph Mining") @@ -51,14 +52,12 @@ def gsp_algorithm_app(): ) if st.button("Run GSP Algorithm"): with st.spinner("Processing..."): - start_time = time.time() customer_sequences = preprocess_sequences_ordered(df) sequences = customer_sequences['SEQUENCE'].tolist() with st.expander("View Processed Sequences"): st.write(sequences) results = gsp_algorithm(sequences, min_support) - end_time = time.time() - st.success(f"Processing completed in {end_time - start_time:.2f} seconds!") + st.success("Processing completed!") st.header("GSP Algorithm Results") st.subheader("Frequent 1-Item Sequences") frequent_1 = results['1_item']['frequent'] @@ -88,7 +87,7 @@ def gsp_algorithm_app(): st.error(f"An error occurred: {str(e)}") def gspan_algorithm_app(): - st.title("GSPan Algorithm Implementation") + st.title("gSpan Algorithm Implementation") uploaded_file = st.file_uploader("Upload your JSON graph dataset file", type=['json'], key="gspan_file") if uploaded_file is not None: temp_file_path = "temp_graphs.json" @@ -102,7 +101,7 @@ def gspan_algorithm_app(): if graphs_dict is not None: min_support = st.slider("Minimum Support", 1, len(graphs_dict), 2, key="gspan_min_support") - if st.button("Run GSPan Algorithm"): + if st.button("Run gSpan Algorithm"): with st.spinner("Processing..."): st.header("DFS Codes for Each Graph") all_dfs_codes = {} @@ -163,7 +162,7 @@ def apriori_algorithm_app(): if error: st.error(f"Error: {error}") else: - st.success(f"Processing completed in {execution_time:.2f} seconds!") + st.success("Processing completed!") if not itemsets_df.empty: st.header("Frequent Itemsets") for level in sorted(itemsets_df["Level"].unique()): @@ -214,7 +213,7 @@ def fp_growth_algorithm_app(): if error: st.error(f"Error: {error}") else: - st.success(f"Processing completed in {execution_time:.2f} seconds!") + st.success("Processing completed!") if not itemsets_df.empty: st.header("Frequent Itemsets") for level in sorted(itemsets_df["Level"].unique()): @@ -233,18 +232,89 @@ def fp_growth_algorithm_app(): except Exception as e: st.error(f"An error occurred: {str(e)}") +def spade_algorithm_app(): + st.title("SPADE Algorithm Implementation") + st.write("This app performs sequential pattern mining using the SPADE algorithm.") + + uploaded_file = st.file_uploader("Upload your CSV file", type=["csv"], key="spade_file") + if uploaded_file is not None: + try: + df = pd.read_csv(uploaded_file) + st.success("File successfully uploaded and read!") + with st.expander("View Uploaded Data"): + st.dataframe(df) + + min_support = st.slider( + "Select minimum support threshold (0-1)", + min_value=0.01, + max_value=1.0, + value=0.5, + step=0.01, + key="spade_min_support" + ) + + if st.button("Run SPADE Algorithm"): + with st.spinner("Processing..."): + transactions_df, detailed_results, all_frequent_df, error = run_spade_analysis(df, min_support) + if error: + st.error(f"Error: {error}") + else: + st.success("Processing completed!") + + # Display vertical format sample + if "vertical_format_sample" in detailed_results: + st.header("Vertical Format Sample") + st.dataframe(detailed_results["vertical_format_sample"]) + + # Display transaction table + if transactions_df is not None and not transactions_df.empty: + st.header("Transaction Table") + st.dataframe(transactions_df) + st.write(f"Total unique sequences (customers): {detailed_results['total_sequences']}") + st.write(f"Minimum support threshold: {detailed_results['min_support']}") + + # Display Frequent 1-Sequences + if "frequent_1" in detailed_results: + st.header("SPADE Algorithm Results") + st.subheader("Frequent 1-Sequences") + st.dataframe(detailed_results["frequent_1"]) + + # Display each level of candidate and frequent sequences + for k, candidates_df in detailed_results.get("candidates", []): + st.subheader(f"Generating {k}-Sequences") + st.write(f"Candidate {k}-Sequences:") + st.dataframe(candidates_df) + + # Find the corresponding frequent sequences for this k + frequent_df = next((df for level, df in detailed_results.get("frequent", []) if level == k), None) + if frequent_df is not None: + st.write(f"Frequent {k}-Sequences:") + st.dataframe(frequent_df) + + # Display all frequent sequences + if not all_frequent_df.empty: + st.subheader("All Frequent Sequences (Ordered by Length)") + st.dataframe(all_frequent_df) + else: + st.write("No frequent sequences found.") + + except Exception as e: + st.error(f"An error occurred: {str(e)}") + def main(): st.sidebar.title("Algorithm Selection") - algorithm = st.sidebar.selectbox("Choose an algorithm", ["Apriori Algorithm", "FP-Growth Algorithm", "Apriori Graph Mining", "GSP Algorithm", "GSPan Algorithm"]) + algorithm = st.sidebar.selectbox("Choose an algorithm", ["Apriori Algorithm", "FP-Growth Algorithm", "SPADE Algorithm", "Apriori Graph Mining", "GSP Algorithm", "gSpan Algorithm"]) if algorithm == "Apriori Algorithm": apriori_algorithm_app() elif algorithm == "FP-Growth Algorithm": fp_growth_algorithm_app() + elif algorithm == "SPADE Algorithm": + spade_algorithm_app() elif algorithm == "Apriori Graph Mining": apriori_graph_mining_app() elif algorithm == "GSP Algorithm": gsp_algorithm_app() - elif algorithm == "GSPan Algorithm": + elif algorithm == "gSpan Algorithm": gspan_algorithm_app() if __name__ == "__main__": diff --git a/tests/PrescriptiveAnalysis1/test_apriori.py b/tests/PrescriptiveAnalysis1/test_apriori.py new file mode 100644 index 0000000..d351c7a --- /dev/null +++ b/tests/PrescriptiveAnalysis1/test_apriori.py @@ -0,0 +1,141 @@ +import unittest +import pandas as pd +import sys +import os +project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.insert(0, project_root) +sys.path.append('../../src') # Relative path from tests/PrescriptiveAnalysis1/ to src/ +from src.PrescriptiveAnalysis1.Backend.apriori import AprioriAlgorithm, BusinessRuleGenerator, run_apriori_analysis + +class TestApriori(unittest.TestCase): + def setUp(self): + # Sample transactional data + self.transactions = [ + {'A', 'B', 'C'}, + {'A', 'B'}, + {'B', 'C'}, + {'A', 'C'}, + {'A', 'B', 'D'} + ] + self.min_support = 0.4 # 40% (2 out of 5 transactions) + self.min_confidence = 0.5 + # Sample DataFrame for run_apriori_analysis + data = { + 'INVOICENO': [1, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 5], + 'PRODUCTNAME': ['A', 'B', 'C', 'A', 'B', 'B', 'C', 'A', 'C', 'A', 'B', 'D'] + } + self.df = pd.DataFrame(data) + + def test_apriori_algorithm_initialization(self): + apriori = AprioriAlgorithm(self.transactions, self.min_support) + self.assertEqual(apriori.transactions, self.transactions) + self.assertEqual(apriori.min_support, self.min_support) + self.assertEqual(apriori.frequent_patterns, {}) + + def test_count_item_frequencies(self): + apriori = AprioriAlgorithm(self.transactions, self.min_support) + candidates = [frozenset({'A'}), frozenset({'B'}), frozenset({'C'}), frozenset({'D'})] + frequent_items = apriori.count_item_frequencies(candidates) + expected = [ + (frozenset({'A'}), 4/5), + (frozenset({'B'}), 4/5), + (frozenset({'C'}), 3/5), + ] + self.assertEqual(len(frequent_items), 3) # D has support 1/5 < 0.4 + for item, support in frequent_items: + self.assertTrue((item, support) in expected) + + def test_create_new_combinations(self): + apriori = AprioriAlgorithm(self.transactions, self.min_support) + prev_frequent = [frozenset({'A'}), frozenset({'B'}), frozenset({'C'})] + new_combinations = apriori.create_new_combinations(prev_frequent, 2) + expected = {frozenset({'A', 'B'}), frozenset({'A', 'C'}), frozenset({'B', 'C'})} + self.assertEqual(new_combinations, expected) + + def test_find_frequent_itemsets(self): + apriori = AprioriAlgorithm(self.transactions, self.min_support) + frequent_patterns = apriori.find_frequent_itemsets() + self.assertIn(1, frequent_patterns) + self.assertIn(2, frequent_patterns) + # Level 1: A, B, C + level_1 = frequent_patterns[1] + self.assertEqual(len(level_1), 3) + expected_1 = {frozenset({'A'}), frozenset({'B'}), frozenset({'C'})} + self.assertTrue(all(item in [x[0] for x in level_1] for item in expected_1)) + # Level 2: A,B; A,C; B,C + level_2 = frequent_patterns[2] + self.assertEqual(len(level_2), 3) + expected_2 = {frozenset({'A', 'B'}), frozenset({'A', 'C'}), frozenset({'B', 'C'})} + self.assertTrue(all(item in [x[0] for x in level_2] for item in expected_2)) + + def test_execute(self): + apriori = AprioriAlgorithm(self.transactions, self.min_support) + patterns, execution_time = apriori.execute() + self.assertEqual(patterns, apriori.frequent_patterns) + self.assertGreaterEqual(execution_time, 0) + self.assertIn(1, patterns) + self.assertIn(2, patterns) + self.assertEqual(len(patterns[1]), 3) # A, B, C + self.assertEqual(len(patterns[2]), 3) # A,B; A,C; B,C + + def test_business_rule_generator(self): + apriori = AprioriAlgorithm(self.transactions, self.min_support) + frequent_patterns = apriori.find_frequent_itemsets() + rule_generator = BusinessRuleGenerator(frequent_patterns, self.transactions, self.min_confidence) + rules = rule_generator.derive_rules() + self.assertTrue(rules) + # Check a sample rule: A => B + for antecedent, consequent, support, confidence in rules: + if antecedent == 'A' and consequent == 'B': + self.assertAlmostEqual(support, 3/5) # A,B appears in 3 transactions + self.assertAlmostEqual(confidence, (3/5) / (4/5)) # Support(A,B) / Support(A) + self.assertGreaterEqual(confidence, self.min_confidence) + + def test_compute_confidence(self): + apriori = AprioriAlgorithm(self.transactions, self.min_support) + frequent_patterns = apriori.find_frequent_itemsets() + rule_generator = BusinessRuleGenerator(frequent_patterns, self.transactions, self.min_confidence) + confidence = rule_generator.compute_confidence(frozenset({'A'}), frozenset({'B'})) + self.assertAlmostEqual(confidence, (3/5) / (4/5)) # Support(A,B) / Support(A) + confidence = rule_generator.compute_confidence(frozenset({'D'}), frozenset({'A'})) + self.assertEqual(confidence, 0) # D not frequent + + def test_fetch_support(self): + apriori = AprioriAlgorithm(self.transactions, self.min_support) + frequent_patterns = apriori.find_frequent_itemsets() + rule_generator = BusinessRuleGenerator(frequent_patterns, self.transactions, self.min_confidence) + support = rule_generator.fetch_support(frozenset({'A', 'B'})) + self.assertAlmostEqual(support, 3/5) + support = rule_generator.fetch_support(frozenset({'A', 'D'})) + self.assertEqual(support, 0) # A,D not frequent + + def test_run_apriori_analysis(self): + itemsets_df, rules_df, execution_time, error = run_apriori_analysis(self.df, self.min_support, self.min_confidence) + self.assertIsNone(error) + self.assertIsNotNone(itemsets_df) + self.assertIsNotNone(rules_df) + self.assertGreaterEqual(execution_time, 0) + # Check DataFrame columns + self.assertEqual(list(itemsets_df.columns), ['Level', 'Frequent Itemset', 'Support']) + self.assertEqual(list(rules_df.columns), ['Antecedent', 'Consequent', 'Support', 'Confidence']) + # Verify some frequent itemsets + self.assertTrue(any('A, B' in itemset for itemset in itemsets_df['Frequent Itemset'])) + # Verify a rule + self.assertTrue(any((row['Antecedent'] == 'A') & (row['Consequent'] == 'B') + for _, row in rules_df.iterrows())) + + def test_run_apriori_analysis_empty(self): + empty_df = pd.DataFrame({'INVOICENO': [], 'PRODUCTNAME': []}) + itemsets_df, rules_df, execution_time, error = run_apriori_analysis(empty_df, self.min_support, self.min_confidence) + self.assertEqual(error, "No valid transactions found.") + self.assertIsNone(itemsets_df) + self.assertIsNone(rules_df) + self.assertIsNone(execution_time) + + def test_run_apriori_analysis_high_support(self): + apriori = AprioriAlgorithm(self.transactions, 0.9) + patterns = apriori.find_frequent_itemsets() + self.assertEqual(patterns, {}) # No itemsets with support >= 0.9 + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/PrescriptiveAnalysis1/test_apriori_graph.py b/tests/PrescriptiveAnalysis1/test_apriori_graph.py new file mode 100644 index 0000000..fb60cb3 --- /dev/null +++ b/tests/PrescriptiveAnalysis1/test_apriori_graph.py @@ -0,0 +1,221 @@ +import unittest +import io +import sys +import os +project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.insert(0, project_root) +from src.PrescriptiveAnalysis1.Backend.apriori_graph import parse_graph_file, get_all_edges, compute_support, apriori_graph_mining + +class TestAprioriGraph(unittest.TestCase): + def setUp(self): + self.graph_data = """ +# Graph 1 +A B +B C +A D +B E +C E +C F +# Graph 2 +A B +B C +A D +B E +# Graph 3 +A C +C D +B E +E F +A F +""" + # Create a file-like object + self.graph_file = io.BytesIO(self.graph_data.encode('utf-8')) + + # Parse graphs for use in tests + self.graph_file.seek(0) + self.graphs = parse_graph_file(self.graph_file) + + # Expected unique edges (sorted tuples) + self.expected_edges = [ + ('A', 'B'), ('A', 'C'), ('A', 'D'), ('A', 'F'), + ('B', 'C'), ('B', 'E'), ('C', 'D'), ('C', 'E'), + ('C', 'F'), ('E', 'F') + ] + + def test_parse_graph_file(self): + self.graph_file.seek(0) + graphs = parse_graph_file(self.graph_file) + self.assertEqual(len(graphs), 3) + # Graph 1: {A-B, B-C, A-D, B-E, C-E, C-F} + self.assertEqual(set(graphs[0]), { + ('A', 'B'), ('B', 'C'), ('A', 'D'), ('B', 'E'), ('C', 'E'), ('C', 'F') + }) + # Graph 2: {A-B, B-C, A-D, B-E} + self.assertEqual(set(graphs[1]), { + ('A', 'B'), ('B', 'C'), ('A', 'D'), ('B', 'E') + }) + # Graph 3: {A-C, C-D, B-E, E-F, A-F} + self.assertEqual(set(graphs[2]), { + ('A', 'C'), ('C', 'D'), ('B', 'E'), ('E', 'F'), ('A', 'F') + }) + + def test_parse_graph_file_empty(self): + empty_file = io.BytesIO(b"") + graphs = parse_graph_file(empty_file) + self.assertEqual(graphs, []) + + def test_parse_graph_file_single_graph(self): + single_graph_data = """ +# Graph 1 +A B +B C +""" + single_file = io.BytesIO(single_graph_data.encode('utf-8')) + graphs = parse_graph_file(single_file) + self.assertEqual(len(graphs), 1) + self.assertEqual(set(graphs[0]), {('A', 'B'), ('B', 'C')}) + + def test_get_all_edges(self): + edges = get_all_edges(self.graphs) + self.assertEqual(edges, self.expected_edges) + self.assertEqual(len(edges), 10) + + def test_get_all_edges_empty(self): + edges = get_all_edges([]) + self.assertEqual(edges, []) + + def test_compute_support(self): + # Single edge support + self.assertEqual(compute_support([('A', 'B')], self.graphs), 2) # G1, G2 + self.assertEqual(compute_support([('B', 'E')], self.graphs), 3) # G1, G2, G3 + self.assertEqual(compute_support([('A', 'F')], self.graphs), 1) # G3 + # Multi-edge support + self.assertEqual(compute_support([('A', 'B'), ('B', 'C')], self.graphs), 2) # G1, G2 + self.assertEqual(compute_support([('A', 'C'), ('C', 'D')], self.graphs), 1) # G3 + self.assertEqual(compute_support([('A', 'B'), ('B', 'E'), ('A', 'D')], self.graphs), 2) # G1, G2 + + def test_compute_support_empty_graphs(self): + support = compute_support([('A', 'B')], []) + self.assertEqual(support, 0) + + def test_apriori_graph_mining_min_support_2(self): + tables, frequent_edge_sets = apriori_graph_mining(self.graphs, min_support=2) + self.assertTrue(len(tables) >= 3) # At least k=1, k=2, k=3 + self.assertTrue(len(frequent_edge_sets) >= 3) + + # k=1 table + table_1 = tables[0] + self.assertEqual(len(table_1), 10) # All 10 edges + expected_edges = { + '(A, B)': {'support': 2, 'graphs': [0, 1]}, + '(A, C)': {'support': 1, 'graphs': [2]}, + '(A, D)': {'support': 2, 'graphs': [0, 1]}, + '(A, F)': {'support': 1, 'graphs': [2]}, + '(B, C)': {'support': 2, 'graphs': [0, 1]}, + '(B, E)': {'support': 3, 'graphs': [0, 1, 2]}, + '(C, D)': {'support': 1, 'graphs': [2]}, + '(C, E)': {'support': 1, 'graphs': [0]}, + '(C, F)': {'support': 1, 'graphs': [0]}, + '(E, F)': {'support': 1, 'graphs': [2]} + } + for entry in table_1: + edge = entry['Edge'] + self.assertIn(edge, expected_edges) + self.assertEqual(entry['Support'], expected_edges[edge]['support']) + self.assertEqual(entry['Qualify'], 'Y' if expected_edges[edge]['support'] >= 2 else 'N') + for i in range(3): + expected = 'Y' if i in expected_edges[edge]['graphs'] else 'N' + self.assertEqual(entry[f'Graph {i+1}'], expected) + + # k=1 frequent edge sets + self.assertEqual(len(frequent_edge_sets[0]), 4) # (A,B), (A,D), (B,C), (B,E) + expected_frequent_1 = [[('A', 'B')], [('A', 'D')], [('B', 'C')], [('B', 'E')]] + self.assertTrue(all(edge_set in frequent_edge_sets[0] for edge_set in expected_frequent_1)) + + # k=2 table + table_2 = tables[1] + expected_k2 = { + '(A, B) (A, D)': {'support': 2, 'graphs': [0, 1]}, + '(A, B) (B, C)': {'support': 2, 'graphs': [0, 1]}, + '(A, B) (B, E)': {'support': 2, 'graphs': [0, 1]}, + '(A, D) (B, C)': {'support': 2, 'graphs': [0, 1]}, + '(A, D) (B, E)': {'support': 2, 'graphs': [0, 1]}, + '(B, C) (B, E)': {'support': 2, 'graphs': [0, 1]} + } + self.assertEqual(len(table_2), len(expected_k2)) + for entry in table_2: + edge_pairs = entry['Edge Pairs'] + self.assertIn(edge_pairs, expected_k2) + self.assertEqual(entry['Support'], expected_k2[edge_pairs]['support']) + self.assertEqual(entry['Qualify'], 'Y') + for i in range(3): + expected = 'Y' if i in expected_k2[edge_pairs]['graphs'] else 'N' + self.assertEqual(entry[f'Graph {i+1}'], expected) + + # k=2 frequent edge sets + self.assertEqual(len(frequent_edge_sets[1]), 6) + expected_frequent_2 = [ + [('A', 'B'), ('A', 'D')], + [('A', 'B'), ('B', 'C')], + [('A', 'B'), ('B', 'E')], + [('A', 'D'), ('B', 'C')], + [('A', 'D'), ('B', 'E')], + [('B', 'C'), ('B', 'E')] + ] + self.assertTrue(all(sorted(edge_set) in [sorted(es) for es in frequent_edge_sets[1]] for edge_set in expected_frequent_2)) + + # k=3 table + table_3 = tables[2] + expected_k3 = { + '(A, B) (A, D) (B, C)': {'support': 2, 'graphs': [0, 1]}, + '(A, B) (A, D) (B, E)': {'support': 2, 'graphs': [0, 1]}, + '(A, B) (B, C) (B, E)': {'support': 2, 'graphs': [0, 1]}, + '(A, D) (B, C) (B, E)': {'support': 2, 'graphs': [0, 1]} + } + self.assertEqual(len(table_3), len(expected_k3)) + for entry in table_3: + edge_pairs = entry['Edge Pairs'] + self.assertIn(edge_pairs, expected_k3) + self.assertEqual(entry['Support'], expected_k3[edge_pairs]['support']) + self.assertEqual(entry['Qualify'], 'Y') + for i in range(3): + expected = 'Y' if i in expected_k3[edge_pairs]['graphs'] else 'N' + self.assertEqual(entry[f'Graph {i+1}'], expected) + + # k=3 frequent edge sets + self.assertEqual(len(frequent_edge_sets[2]), 4) + expected_frequent_3 = [ + [('A', 'B'), ('A', 'D'), ('B', 'C')], + [('A', 'B'), ('A', 'D'), ('B', 'E')], + [('A', 'B'), ('B', 'C'), ('B', 'E')], + [('A', 'D'), ('B', 'C'), ('B', 'E')] + ] + self.assertTrue(all(sorted(edge_set) in [sorted(es) for es in frequent_edge_sets[2]] for edge_set in expected_frequent_3)) + + def test_apriori_graph_mining_min_support_3(self): + tables, frequent_edge_sets = apriori_graph_mining(self.graphs, min_support=3) + self.assertEqual(len(tables), 2) # k=1, k=2 (k=2 is empty) + self.assertEqual(len(frequent_edge_sets), 2) + # k=1: Only (B,E) has support 3 + table_1 = tables[0] + self.assertEqual(len(frequent_edge_sets[0]), 1) + self.assertEqual(frequent_edge_sets[0], [[('B', 'E')]]) + for entry in table_1: + if entry['Edge'] == '(B, E)': + self.assertEqual(entry['Support'], 3) + self.assertEqual(entry['Qualify'], 'Y') + self.assertEqual(entry['Graph 1'], 'Y') + self.assertEqual(entry['Graph 2'], 'Y') + self.assertEqual(entry['Graph 3'], 'Y') + else: + self.assertEqual(entry['Qualify'], 'N') + # k=2: Empty + self.assertEqual(frequent_edge_sets[1], []) + + def test_apriori_graph_mining_empty_graphs(self): + tables, frequent_edge_sets = apriori_graph_mining([], min_support=2) + self.assertEqual(tables, [[]]) + self.assertEqual(frequent_edge_sets, [[]]) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/PrescriptiveAnalysis1/test_fp_growth.py b/tests/PrescriptiveAnalysis1/test_fp_growth.py new file mode 100644 index 0000000..340127a --- /dev/null +++ b/tests/PrescriptiveAnalysis1/test_fp_growth.py @@ -0,0 +1,105 @@ +import unittest +import pandas as pd +import sys +import sys +import os +project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.insert(0, project_root) +from src.PrescriptiveAnalysis1.Backend.fp_growth import FPNode, FPTree, FPGrowth, BusinessRuleGenerator, run_fp_growth_analysis + + +class TestFPGrowth(unittest.TestCase): + def setUp(self): + # Sample transactions for testing + self.transactions = [ + {'A', 'B', 'C'}, + {'A', 'B'}, + {'B', 'C'}, + {'A', 'C'}, + {'A', 'B', 'C', 'D'} + ] + self.min_support = 0.4 # 40% (2 out of 5 transactions) + self.min_confidence = 0.5 + # Sample DataFrame for run_fp_growth_analysis + data = { + 'INVOICENO': [1, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 5, 5], + 'PRODUCTNAME': ['A', 'B', 'C', 'A', 'B', 'B', 'C', 'A', 'C', 'A', 'B', 'C', 'D'] + } + self.df = pd.DataFrame(data) + + def test_fp_node_initialization(self): + node = FPNode('A', 2, None) + self.assertEqual(node.item, 'A') + self.assertEqual(node.count, 2) + self.assertIsNone(node.parent) + self.assertEqual(node.children, {}) + self.assertIsNone(node.next_link) + + def test_fp_tree_build(self): + tree = FPTree(self.transactions, self.min_support, len(self.transactions)) + self.assertIsNotNone(tree.root) + self.assertEqual(tree.root.item, None) + self.assertTrue(tree.item_support) # Ensure item_support is populated + # Check if frequent items meet min_support (2 transactions) + expected_items = {'A', 'B', 'C'} # D should be excluded (appears in 1 transaction) + self.assertEqual(set(tree.item_support.keys()), expected_items) + + def test_fp_tree_insert_transaction(self): + tree = FPTree([], self.min_support, 5) # Empty tree + tree.item_support = {'A': [3, None], 'B': [2, None]} + transaction = ['A', 'B'] + tree.insert_transaction(transaction, tree.root) + # Check if nodes were created + self.assertIn('A', tree.root.children) + self.assertIn('B', tree.root.children['A'].children) + # Check counts + self.assertEqual(tree.root.children['A'].count, 1) + self.assertEqual(tree.root.children['A'].children['B'].count, 1) + # Check header table linkage + self.assertIsNotNone(tree.item_support['A'][1]) + self.assertIsNotNone(tree.item_support['B'][1]) + + def test_business_rule_generator(self): + fp_growth = FPGrowth(self.transactions, self.min_support) + patterns, _ = fp_growth.find_frequent_patterns() + rule_generator = BusinessRuleGenerator(patterns, self.transactions, self.min_confidence) + rules = rule_generator.derive_rules() + self.assertTrue(rules) # Ensure rules are generated + # Check a sample rule, e.g., {A, B} => {C} + for antecedent, consequent, support, confidence in rules: + if antecedent == 'A, B' and consequent == 'C': + self.assertGreaterEqual(confidence, self.min_confidence) + self.assertAlmostEqual(support, 2/5) # {A, B, C} appears in 2 transactions + + def test_run_fp_growth_analysis(self): + itemsets_df, rules_df, execution_time, error = run_fp_growth_analysis( + self.df, self.min_support, self.min_confidence + ) + self.assertIsNone(error) + self.assertIsNotNone(itemsets_df) + self.assertIsNotNone(rules_df) + self.assertGreaterEqual(execution_time, 0) # Modified to allow zero + # Check if itemsets_df has expected columns + self.assertEqual(list(itemsets_df.columns), ['Level', 'Frequent Itemset', 'Support']) + # Check if rules_df has expected columns + self.assertEqual(list(rules_df.columns), ['Antecedent', 'Consequent', 'Support', 'Confidence']) + # Verify some frequent itemsets + self.assertTrue(any('A, B' in itemset for itemset in itemsets_df['Frequent Itemset'])) + + def test_empty_transactions(self): + df = pd.DataFrame({'INVOICENO': [], 'PRODUCTNAME': []}) + itemsets_df, rules_df, execution_time, error = run_fp_growth_analysis( + df, self.min_support, self.min_confidence + ) + self.assertEqual(error, "No valid transactions found.") + self.assertIsNone(itemsets_df) + self.assertIsNone(rules_df) + self.assertIsNone(execution_time) + + def test_low_support(self): + fp_growth = FPGrowth(self.transactions, 0.9) + patterns, _ = fp_growth.find_frequent_patterns() + self.assertEqual(patterns, {}) # No patterns should be found + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/PrescriptiveAnalysis1/test_gspan.py b/tests/PrescriptiveAnalysis1/test_gspan.py new file mode 100644 index 0000000..e53f9c8 --- /dev/null +++ b/tests/PrescriptiveAnalysis1/test_gspan.py @@ -0,0 +1,154 @@ +import unittest +import json +import sys +import os +project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.insert(0, project_root) +from src.PrescriptiveAnalysis1.Backend.gspan import load_graphs_from_json, construct_dfs_code, normalize_edge, is_subgraph_present, enumerate_subgraphs, run_gspan_analysis + +class TestGSpan(unittest.TestCase): + def setUp(self): + self.test_json_content = { + "G1": { + "A": ["B", "C"], + "B": ["A"], + "C": ["A", "D"], + "D": ["C", "A"] + }, + "G2": { + "A": ["B", "C"], + "B": ["A", "D"], + "C": ["A", "E"], + "D": ["B"], + "E": ["C"] + }, + "G3": { + "A": ["B", "C"], + "B": ["A", "D"], + "C": ["D", "A"], + "D": ["B", "C"] + } + } + self.test_json_file = "test_gspan_graphs.json" + with open(self.test_json_file, 'w') as f: + json.dump(self.test_json_content, f) + + self.graphs = load_graphs_from_json(self.test_json_file) + self.directed = True + self.min_support = 2 + + def tearDown(self): + if os.path.exists(self.test_json_file): + os.remove(self.test_json_file) + + def test_load_graphs_from_json(self): + graphs = load_graphs_from_json(self.test_json_file) + self.assertIsNotNone(graphs) + self.assertEqual(len(graphs), 3) + self.assertIn("G1", graphs) + self.assertIn("G2", graphs) + self.assertIn("G3", graphs) + self.assertEqual(set(graphs["G1"].keys()), {"A", "B", "C", "D"}) + self.assertEqual(graphs["G1"]["A"], ["B", "C"]) + + def test_load_graphs_from_json_invalid_file(self): + result = load_graphs_from_json("non_existent.json") + self.assertIsNone(result) + + def test_load_graphs_from_json_invalid_json(self): + with open("invalid.json", "w") as f: + f.write("invalid json") + result = load_graphs_from_json("invalid.json") + self.assertIsNone(result) + os.remove("invalid.json") + + def test_construct_dfs_code(self): + graph = self.graphs["G1"] + dfs_code, discovery_order = construct_dfs_code(graph, "A", directed=True) + self.assertTrue(dfs_code) + self.assertTrue(discovery_order) + self.assertEqual(len(discovery_order), len(graph)) + for code in dfs_code: + self.assertEqual(len(code), 5) + self.assertIn(code[2], graph.keys()) + self.assertIn(code[4], graph.keys()) + self.assertEqual(code[3], 1) + + def test_normalize_edge_directed(self): + edge = normalize_edge("A", "B", True, directed=True) + self.assertEqual(edge, ("A", "B", True)) + edge = normalize_edge("B", "A", False, directed=True) + self.assertEqual(edge, ("B", "A", False)) + + def test_normalize_edge_undirected(self): + edge = normalize_edge("A", "B", True, directed=False) + self.assertEqual(edge, ("A", "B", True)) + edge = normalize_edge("B", "A", False, directed=False) + self.assertEqual(edge, ("A", "B", True)) + + def test_is_subgraph_present_directed(self): + dfs_code, _ = construct_dfs_code(self.graphs["G1"], "A", directed=True) + subgraph_edges = [("A", "B", True), ("A", "C", True)] + self.assertTrue(is_subgraph_present(subgraph_edges, dfs_code, directed=True)) + subgraph_edges = [("A", "E", True)] + self.assertFalse(is_subgraph_present(subgraph_edges, dfs_code, directed=True)) + + def test_is_subgraph_present_undirected(self): + dfs_code, _ = construct_dfs_code(self.graphs["G1"], "A", directed=False) + subgraph_edges = [("A", "B", True), ("A", "C", True)] + self.assertTrue(is_subgraph_present(subgraph_edges, dfs_code, directed=False)) + subgraph_edges = [("A", "E", True)] + self.assertFalse(is_subgraph_present(subgraph_edges, dfs_code, directed=False)) + + def test_enumerate_subgraphs_directed(self): + frequent_subgraphs, infrequent_subgraphs, dfs_codes = enumerate_subgraphs(self.graphs, self.min_support, directed=True) + self.assertTrue(frequent_subgraphs) + self.assertTrue(dfs_codes) + for size, subgraphs in frequent_subgraphs.items(): + for edge_str, (edges, support, _) in subgraphs.items(): + self.assertGreaterEqual(support, self.min_support) + supporting_graphs = [g for g, code in dfs_codes.items() if is_subgraph_present(edges, code, directed=True)] + self.assertEqual(len(supporting_graphs), support) + self.assertIn("(A-B)", frequent_subgraphs[1]) + self.assertIn("(A-C)", frequent_subgraphs[1]) + self.assertEqual(frequent_subgraphs[1]["(A-B)"][1], 3) + + + def test_enumerate_subgraphs_undirected(self): + frequent_subgraphs, infrequent_subgraphs, dfs_codes = enumerate_subgraphs(self.graphs, self.min_support, directed=False) + self.assertTrue(frequent_subgraphs) + self.assertIn("(A-B)", frequent_subgraphs[1]) + self.assertNotIn("(B-A)", frequent_subgraphs[1]) + + def test_run_gspan_analysis(self): + result_tables, frequent_edge_sets = run_gspan_analysis(self.graphs, self.min_support, directed=True) + self.assertTrue(result_tables) + self.assertTrue(frequent_edge_sets) + for table in result_tables: + for entry in table: + self.assertIn("Edge Pairs", entry) + self.assertIn("Support", entry) + self.assertIn("Qualify", entry) + self.assertIn("Graph 1", entry) + self.assertIn("Graph 2", entry) + self.assertIn("Graph 3", entry) + self.assertEqual(entry["Qualify"], "Y") + self.assertGreaterEqual(entry["Support"], self.min_support) + found_ab = False + for table in result_tables: + for entry in table: + if entry["Edge Pairs"] == "(A-B)": + found_ab = True + self.assertEqual(entry["Support"], 3) + self.assertEqual(entry["Graph 1"], "Y") + self.assertEqual(entry["Graph 2"], "Y") + self.assertEqual(entry["Graph 3"], "Y") + self.assertTrue(found_ab) + + def test_run_gspan_analysis_high_min_support(self): + result_tables, frequent_edge_sets = run_gspan_analysis(self.graphs, min_support=4, directed=True) + self.assertEqual(result_tables, []) + self.assertEqual(frequent_edge_sets, []) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/PrescriptiveAnalysis1/test_spade.py b/tests/PrescriptiveAnalysis1/test_spade.py new file mode 100644 index 0000000..bfb516c --- /dev/null +++ b/tests/PrescriptiveAnalysis1/test_spade.py @@ -0,0 +1,69 @@ +import unittest +import pandas as pd +import sys +import os +from collections import defaultdict + +# Add project root to Python path +project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.insert(0, project_root) + +from src.PrescriptiveAnalysis1.Backend.spade import ( + preprocess_data_vertical, + get_transaction_table, + create_idlists, + format_pattern, + get_pattern_length, + run_spade_analysis +) + +class TestSPADE(unittest.TestCase): + def setUp(self): + # Load sample data for testing + data = { + 'NAME': [1, 1, 1, 1, 2, 2, 3, 4, 4, 4], + 'INVOICEDATE': ['1/1/2025', '1/3/2025', '1/4/2025', '1/4/2025', '1/1/2025', '1/1/2025', '1/1/2025', '1/2/2025', '1/2/2025', '1/3/2025'], + 'PRODUCTNAME': ['C,D', 'A,B,C', 'A,B,F', 'A,C,D,F', 'A,B,F', 'E', 'A,B,F', 'D,H,G', 'B,F', 'A,G,H'] + } + self.df = pd.DataFrame(data) + self.min_support = 0.5 # 50% (2 out of 4 sequences) + # Preprocessed vertical format for use in tests + self.vertical_df, _ = preprocess_data_vertical(self.df) + + def test_get_transaction_table(self): + transactions_df, error = get_transaction_table(self.vertical_df) + self.assertIsNone(error) + self.assertIsNotNone(transactions_df) + self.assertEqual(list(transactions_df.columns), ['Customer ID (SID)', 'Event ID (EID)', 'Items']) + self.assertGreater(len(transactions_df), 0) # Ensure non-empty + + def test_create_idlists(self): + idlists, error = create_idlists(self.vertical_df) + self.assertIsNone(error) + self.assertIsInstance(idlists, defaultdict) + # Check some items + self.assertIn('A', idlists) + self.assertIn('B', idlists) + + def test_format_pattern(self): + self.assertEqual(format_pattern(frozenset(['A', 'B'])), '{A, B}') + self.assertEqual(format_pattern(('A', 'B')), ' B>') + self.assertEqual(format_pattern(frozenset(['C'])), '{C}') + + def test_get_pattern_length(self): + self.assertEqual(get_pattern_length(frozenset(['A', 'B'])), 2) + self.assertEqual(get_pattern_length(('A', 'B')), 2) + self.assertEqual(get_pattern_length(frozenset(['C'])), 1) + + def test_run_spade_analysis(self): + transactions_df, detailed_results, all_frequent_df, error = run_spade_analysis(self.df, self.min_support) + self.assertIsNone(error) + self.assertIsNotNone(transactions_df) + self.assertIsNotNone(detailed_results) + self.assertIsNotNone(all_frequent_df) + # Check basic structure + self.assertEqual(list(all_frequent_df.columns), ['Pattern', 'Support', 'Pattern Type', 'Length']) + self.assertGreater(len(all_frequent_df), 0) # Ensure non-empty results + +if __name__ == '__main__': + unittest.main() \ No newline at end of file