Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion silnlp/common/check_books.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def parse_book(project_dir: str, book: str):

settings = FileParatextProjectSettingsParser(project_dir).parse()
book_path = Path(project_dir) / settings.get_book_file_name(book)

LOGGER.info(f"Attempting to parse {book} from {book_path}.")

if not book_path.is_file():
raise RuntimeError(f"Can't find file {book_path} for book {book}")

Expand Down
116 changes: 116 additions & 0 deletions silnlp/common/combine_scores_save.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import argparse
import csv
import sys
from collections import defaultdict
from pathlib import Path

import pandas as pd

from ..common.environment import SIL_NLP_ENV


def check_for_lock_file(folder: Path, filename: str, file_type: str):
"""Check for lock files and ask the user to close them then exit."""

if file_type[0] == ".":
file_type = file_type[1:]

if file_type.lower() == "csv":
lockfile = folder / f".~lock.{filename}.{file_type}#"
elif file_type.lower() == "xlsx":
lockfile = folder / f"~${filename}.{file_type}"

if lockfile.is_file():
print(f"Found lock file: {lockfile}")
print(f"Please close {filename}.{file_type} in folder {folder} OR delete the lock file and try again.")
sys.exit()


def aggregate_csv(folder_path):
# Dictionary to store rows by header type
data_by_header = defaultdict(list)

# Iterate over all CSV files in the folder and its subfolders
for csv_file in folder_path.rglob("*/scores-*.csv"):
series = csv_file.parts[-3] # Extract series folder name
experiment = csv_file.parts[-2] # Extract experiment folder name
steps = csv_file.stem.split("-")[-1] # Extract steps from file name

# Read the CSV file and add new columns
with open(csv_file, "r") as f:
reader = csv.reader(f)
rows = list(reader)
header = tuple(rows[0]) # Use tuple to make it hashable

# Add columns to the beginning of each row
if header not in data_by_header:
data_by_header[header].append(["Series", "Experiment", "Steps"] + list(header))
for row in rows[1:]:
data_by_header[header].append([series, experiment, steps] + row)

return data_by_header


def write_to_csv(data_by_header, folder, output_filename):

output_file = folder / f"{output_filename}.csv"
with open(output_file, "w", newline="") as f:
writer = csv.writer(f)
for header, rows in data_by_header.items():
writer.writerows(rows)
writer.writerow([]) # Add a blank row to separate different types
# Write the folder path to the last line of the CSV file
writer.writerow([folder])
print(f"Wrote scores to {output_file}")


def write_to_excel(data_by_header, folder, output_filename):
output_file = folder / f"{output_filename}.xlsx"
with pd.ExcelWriter(output_file) as writer:
for i, (header, rows) in enumerate(data_by_header.items()):
# Create a DataFrame for the current header
df = pd.DataFrame(rows[1:], columns=rows[0])
# Convert columns to appropriate data types
df = df.apply(pd.to_numeric, errors="ignore")
# Generate a unique sheet name
sheet_name = f"Table_{i + 1}"
# Write the DataFrame to the Excel file
df.to_excel(writer, sheet_name=sheet_name, index=False)
print(f"Wrote scores to {output_file}")


def main():
parser = argparse.ArgumentParser(description="Aggregate CSV files in a folder.")
parser.add_argument("folder", type=Path, help="Path to the folder containing CSV files.")
parser.add_argument(
"--output_filename",
type=str,
default="scores",
help="Filename suffix without the '.csv' or '.xlsx'. \
The folder name is added as a prefix to make it easier to distinguish scores files in search results.",
)
args = parser.parse_args()

folder = Path(args.folder)

csv_filename = f"{folder}_{args.output_filename}"
excel_filename = f"{folder}_{args.output_filename}"

if not folder.is_dir():
folder = Path(SIL_NLP_ENV.mt_experiments_dir) / args.folder

# Check for lock files and ask the user to close them.
check_for_lock_file(folder, csv_filename, "csv")
check_for_lock_file(folder, excel_filename, "xlsx")

data = aggregate_csv(folder)

# Write the aggregated data to a new CSV file
write_to_csv(data, folder, csv_filename)

# Write the aggregated data to an Excel file
write_to_excel(data, folder, excel_filename)


if __name__ == "__main__":
main()
111 changes: 86 additions & 25 deletions silnlp/common/find_by_iso.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
import argparse
import json
import logging
from pathlib import Path
from typing import Dict, List, Set, Tuple, Union
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Set, Tuple

import regex as re

from .environment import SIL_NLP_ENV
from .iso_info import NLLB_ISO_SET, ALT_ISO
from .iso_info import ALT_ISO, NLLB_ISO_SET

IsoCode = str
IsoCodeList = List[IsoCode]
IsoCodeSet = Set[IsoCode]

LANGUAGE_FAMILY_FILE = SIL_NLP_ENV.assets_dir / "languageFamilies.json"


def load_language_data(file_path: Path) -> Tuple[Dict, Dict, Dict]:
try:
with open(file_path, "r", encoding="utf-8") as file:
Expand Down Expand Up @@ -54,8 +58,6 @@ def find_related_isocodes(
for iso_code in iso_codes:
if iso_code in language_data:
lang_info = language_data[iso_code]
# logger.info(f"{iso_code}: {lang_info['Name']}, {lang_info['Country']}, {lang_info['Family']}")

iso_set.update(country_data.get(lang_info["Country"], []))
iso_set.update(family_data.get(lang_info["Family"], []))

Expand All @@ -64,10 +66,10 @@ def find_related_isocodes(

def get_files_by_iso(isocodes: IsoCodeList, scripture_dir: Path) -> List[Path]:
return [
file for file in scripture_dir.glob('*.txt')
if any(file.stem.startswith(isocode + '-') for isocode in isocodes)
file for file in scripture_dir.glob("*.txt") if any(file.stem.startswith(isocode + "-") for isocode in isocodes)
]


def split_files_by_projects(files: List[Path], projects_dir: Path) -> Tuple[Dict[Path, Path], Dict[Path, Path]]:
existing_projects = {}
missing_projects = {}
Expand All @@ -85,24 +87,64 @@ def split_files_by_projects(files: List[Path], projects_dir: Path) -> Tuple[Dict
def get_equivalent_isocodes(iso_codes: List[str]) -> Set[str]:
return {code for iso_code in iso_codes for code in (iso_code, ALT_ISO.get_alternative(iso_code)) if code}


def filter_files(files: List[Path], excluded_patterns: List[str]) -> List[Path]:
filtered = []

today = datetime.now()
today_pattern = re.compile(f"{today.strftime('_%Y_%m_%d')}|{today.strftime('_%d_%m_%Y')}")
date_pattern = re.compile(r"_\d{4}_\d{1,2}_\d{1,2}|_\d{1,2}_\d{1,2}_\d{4}")

for file in files:
parts = file.stem.split("-", 1)
if len(parts) != 2:
continue
iso, name = parts
if today_pattern.search(name):
filtered.append(file)
continue
if date_pattern.search(name):
continue
if len(iso) not in (2, 3):
continue
if any(pattern.lower() in name.lower() for pattern in excluded_patterns):
continue
if file.is_file() and file.stat().st_size < 100_000:
continue
filtered.append(file)
return filtered


def main():
parser = argparse.ArgumentParser(description="Find related ISO language codes.")
parser.add_argument("iso_codes", nargs="+", help="ISO codes to find related languages for")
parser.add_argument("--scripture-dir", type=Path, default=Path(SIL_NLP_ENV.mt_scripture_dir), help="Directory containing scripture files")
parser.add_argument("--all-related", action='store_true', help="List all related scriptures without filtering to those that are part of NLLB")
parser.add_argument("--no-related", action='store_true', help="Only list scriptures in the specified languages and not in related languages")
parser.add_argument(
"--scripture-dir",
type=Path,
default=Path(SIL_NLP_ENV.mt_scripture_dir),
help="Directory containing scripture files",
)
parser.add_argument(
"--all-related",
action="store_true",
help="List all related scriptures without filtering to those that are part of NLLB",
)
parser.add_argument(
"--no-related",
action="store_true",
help="Only list scriptures in the specified languages and not in related languages",
)
parser.add_argument("--output", type=Path, help="Output to the specified file.")

args = parser.parse_args()

# Create a custom logger
logger = logging.getLogger(__name__)
#logger.basicConfig()

# Set the global logging level
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(message)s')
logger.setLevel(logging.INFO)

formatter = logging.Formatter("%(message)s")

if args.output:
# Create handler for the file output.
Expand All @@ -115,30 +157,29 @@ def main():
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)


language_data, country_data, family_data = load_language_data(LANGUAGE_FAMILY_FILE)
projects_dir = SIL_NLP_ENV.pt_projects_dir
scripture_dir = Path(args.scripture_dir)

if not language_data:
logging.error("Failed to load language data.")
return

# Get equivalent ISO codes for input
iso_codes = get_equivalent_isocodes(args.iso_codes)

if args.no_related:

# Option 2: No files in related languages, only equivalent ISO codes
codes_to_find = list(iso_codes)
logger.info(f"\nConsidering only the specified iso codes and their equivalents. {codes_to_find}")

else:
# Find related ISO codes
codes_to_find = find_related_isocodes(list(iso_codes), language_data, country_data, family_data)
logger.info(f"\nFound {len(codes_to_find)} related languages:\n{codes_to_find}.")

if not args.all_related:
if not args.all_related:
# Option 3 (default): Filter to NLLB languages
codes_to_find = [iso for iso in codes_to_find if iso in NLLB_ISO_SET]
logger.info(f"\nFound {len(codes_to_find)} specified or related languages in NLLB:\n{codes_to_find}")
Expand All @@ -148,10 +189,29 @@ def main():

# Get all possible 2 and 3 letter codes for the related languages
all_possible_codes = get_equivalent_isocodes(codes_to_find)

# Find files matching the codes
files = get_files_by_iso(all_possible_codes, scripture_dir)
existing_projects, missing_projects = split_files_by_projects(files, projects_dir)

# Filter out AI and XRI files, and others.
excluded_patterns = [
"XRI",
"600M",
"3.3B",
"1.3B",
"words",
"name",
"clean",
"transcription",
"matthew",
"mark",
"mrk",
"luk",
]
filtered_files = filter_files(files, excluded_patterns)
print(f"There are {len(files)} files and {len(files)-len(filtered_files)} were filtered out.")

existing_projects, missing_projects = split_files_by_projects(filtered_files, projects_dir)

# Display results
if existing_projects:
Expand All @@ -163,12 +223,13 @@ def main():
logger.info(f"\nThese {len(missing_projects)} files don't have a corresponding project folder:")
for file, _ in missing_projects.items():
logger.info(f"{file.stem}")
logger.info(f"\nAll the files:")
for file in files:
logger.info("\nFiltered files:")
for file in filtered_files:
logger.info(f" - {file.stem}")

if not files:
logger.info("\nCouldn't find any Scripture files in these languages.")


if __name__ == "__main__":
main()
main()
Loading