diff --git a/.qlty/qlty.toml b/.qlty/qlty.toml index a08286ac5..de8fa4ced 100644 --- a/.qlty/qlty.toml +++ b/.qlty/qlty.toml @@ -104,12 +104,8 @@ threshold = 10 enabled = true [smells.function_complexity] -threshold = 10 -enabled = true - -[smells.duplication] +threshold = 15 enabled = true -threshold = 20 [[source]] name = "default" diff --git a/avaframe/ana1Tests/rotationTest.py b/avaframe/ana1Tests/rotationTest.py index 7eaa1be27..32812257a 100644 --- a/avaframe/ana1Tests/rotationTest.py +++ b/avaframe/ana1Tests/rotationTest.py @@ -11,6 +11,7 @@ # Local imports # import config and init tools from avaframe.in3Utils import fileHandlerUtils as fU +from avaframe.in3Utils import cfgUtils from avaframe.log2Report import generateReport as gR from avaframe.in3Utils import geoTrans as gT import avaframe.in2Trans.rasterUtils as IOf @@ -63,7 +64,7 @@ def mainRotationTest(avalancheDir, energyLineTestCfg, com1DFACfg, dem, simDF, re fU.makeADir(outDir) # get reference angle simName = simDF.loc[refSimRowHash, 'simName'] - relName = (simName.split('_'))[0] + relName = cfgUtils.parseSimName(simName)["releaseName"] thetaRef = float(relName[3:]) for rowSimHash in simDF.index: # rotate results to be able to proceed to the aimec analysis @@ -119,7 +120,7 @@ def rotateDFAResults(avalancheDir, simDF, rowSimHash, resTypeList, thetaRef, com """ log.debug('Rotating simulation: %s' % rowSimHash) simName = simDF.loc[rowSimHash, 'simName'] - relName = (simName.split('_'))[0] + relName = cfgUtils.parseSimName(simName)["releaseName"] theta = float(relName[3:]) simDF.loc[rowSimHash, 'relAngle'] = theta thetaRot = theta - thetaRef diff --git a/avaframe/ana3AIMEC/dfa2Aimec.py b/avaframe/ana3AIMEC/dfa2Aimec.py index 45131a047..115ac43f3 100644 --- a/avaframe/ana3AIMEC/dfa2Aimec.py +++ b/avaframe/ana3AIMEC/dfa2Aimec.py @@ -9,6 +9,7 @@ # local modules from avaframe.in3Utils import fileHandlerUtils as fU +from avaframe.in3Utils import cfgUtils # create local logger # change log level in calling module to DEBUG to see log messages @@ -52,12 +53,17 @@ def getMBInfo(avaDir, inputsDF, comMod, simName=''): message = 'No mass log file found in directory %s' % (str(dir)) log.error(message) raise FileNotFoundError(message) - mbNames = sorted(set(mbFiles), key=lambda s: (str(s).split("_")[1], str(s).split("_")[2], str(s).split("_")[4])) + # Sort mass balance files by simName components using parseSimName + mbNames = sorted(set(mbFiles), key=lambda f: ( + # Extract simName by removing 'mass_' prefix (5 characters) and parse components + cfgUtils.parseSimName(f.stem[5:])["simHash"], + cfgUtils.parseSimName(f.stem[5:])["modName"], + cfgUtils.parseSimName(f.stem[5:])["simType"] + )) for mFile in mbNames: - name = mFile.stem - nameParts = name.split('_') - simName = ('_'.join(nameParts[1:])) + # Extract simName from filename (remove 'mass_' prefix) + simName = mFile.stem[5:] simRowHash = inputsDF[inputsDF['simName'] == simName].index[0] inputsDF.loc[simRowHash, 'massBal'] = mFile log.debug('Added to inputsDF[massBal] %s' % (mFile)) diff --git a/avaframe/ana4Stats/probAna.py b/avaframe/ana4Stats/probAna.py index 176435977..eb7d5dc5f 100644 --- a/avaframe/ana4Stats/probAna.py +++ b/avaframe/ana4Stats/probAna.py @@ -461,7 +461,7 @@ def probAnalysis(avaDir, cfg, modName, parametersDict="", inputDir="", probConf= fU.makeADir(outDir) # fetch all result files and filter simulations according to parametersDict - if modName.lower() == "com1dfa": + if modName.lower() in ["com1dfa", "com5snowslide", "com6rockavalanche", "com8motpsa", "com9motvoellmy"]: simNameList = cfgHandling.filterSims(avaDir, parametersDict, specDir=inputDir, simDF=simDFActual) filtering = True else: @@ -688,7 +688,13 @@ def createSampleFromConfig(avaDir, cfgProb, comMod): _, thReadFromShp = checkParameterSettings(cfgStart, varParList) modNameString = str(pathlib.Path(comMod.__file__).stem) - if modNameString.lower() in ["com1dfa", "com8motpsa"]: + if modNameString.lower() in [ + "com1dfa", + "com5snowslide", + "com6rockavalanche", + "com8motpsa", + "com9motvoellmy", + ]: # check if thickness parameters are actually read from shp file _, thReadFromShp = checkParameterSettings(cfgStart, varParList) else: @@ -1079,7 +1085,7 @@ def createCfgFiles(paramValuesDList, comMod, cfg, cfgPath=""): cfgStart[section][par] = str(pVal[index]) else: cfgStart["GENERAL"][par] = str(pVal[index]) - if modName.lower() == "com1dfa": + if modName.lower() in ["com1dfa", "com5snowslide", "com6rockavalanche"]: cfgStart["VISUALISATION"]["scenario"] = str(count1) cfgStart["INPUT"]["thFromIni"] = paramValuesD["thFromIni"] if "releaseScenario" in paramValuesD.keys(): diff --git a/avaframe/ana5Utils/DFAPathGeneration.py b/avaframe/ana5Utils/DFAPathGeneration.py index 4b38b6c47..25d46ec24 100644 --- a/avaframe/ana5Utils/DFAPathGeneration.py +++ b/avaframe/ana5Utils/DFAPathGeneration.py @@ -690,7 +690,7 @@ def saveSplitAndPath(avalancheDir, simDFrow, splitPoint, avaProfileMass, dem): avaProfileMass['y'] = avaProfileMass['y'] + dem['originalHeader']['yllcenter'] # get projection from release shp layer simName = simDFrow['simName'] - relName = simName.split('_')[0] + relName = cfgUtils.parseSimName(simName)["releaseName"] inProjection = pathlib.Path(avalancheDir, 'Inputs', 'REL', relName + '.prj') # save profile in Inputs pathAB = pathlib.Path(avalancheDir, 'Outputs', 'ana5Utils', 'DFAPath', 'massAvgPath_%s_AB_aimec' % simName) diff --git a/avaframe/com1DFA/com1DFA.py b/avaframe/com1DFA/com1DFA.py index c0eda26d4..021b49b42 100644 --- a/avaframe/com1DFA/com1DFA.py +++ b/avaframe/com1DFA/com1DFA.py @@ -3,12 +3,14 @@ """ import copy +import inspect import logging import math import os import pathlib import pickle import platform +import re import time from datetime import datetime from functools import partial @@ -651,7 +653,9 @@ def prepareInputData(inputSimFiles, cfg): ) else: secRelRasterPath = pathlib.Path( - cfg["GENERAL"]["avalancheDir"], "Inputs", cfg["INPUT"]["secondaryRelThFile"] + cfg["GENERAL"]["avalancheDir"], + "Inputs", + cfg["INPUT"]["secondaryRelThFile"], ) secrelRasterDict = IOf.readRaster(secRelRasterPath) secondaryReleaseLine = { @@ -1313,7 +1317,10 @@ def initializeSimulation(cfg, outDir, demOri, inputSimLines, logName): ) log.info( "Entrainment area raster derived from %s saved to %s" - % (inputSimLines["entResInfo"]["entThFileType"], str(outDir / "entrainmentRaster")) + % ( + inputSimLines["entResInfo"]["entThFileType"], + str(outDir / "entrainmentRaster"), + ) ) # surfacic entrainment mass available (unit kg/m²) @@ -2118,7 +2125,6 @@ def DFAIterate(cfg, particles, fields, dem, inputSimLines, outDir, cuSimName, si # Update dtSave to remove the initial timestep we just saved dtSave = updateSavingTimeStep(dtSaveOriginal, cfgGen, t) - # export particles properties for visulation if cfg["VISUALISATION"].getboolean("writePartToCSV"): particleTools.savePartToCsv( @@ -2569,7 +2575,15 @@ def writeMBFile(infoDict, avaDir, logName): def computeEulerTimeStep( - cfg, particles, fields, zPartArray0, dem, tCPU, frictType, resistanceType, reportAreaInfo + cfg, + particles, + fields, + zPartArray0, + dem, + tCPU, + frictType, + resistanceType, + reportAreaInfo, ): """compute next time step using an euler forward scheme @@ -2735,7 +2749,11 @@ def releaseSecRelArea(cfg, particles, fields, dem, zPartArray0, reportAreaInfo): else: secondaryReleaseInfo["rasterData"] = secRelRaster secRelParticles = initializeParticles( - cfg, secondaryReleaseInfo, dem, relThField=secRelRaster, thName="secondaryRel" + cfg, + secondaryReleaseInfo, + dem, + relThField=secRelRaster, + thName="secondaryRel", ) # release secondary release area by just appending the particles log.info( @@ -3023,7 +3041,11 @@ def exportFields( outFile = outDirPeak / dataName useCompression = cfg["EXPORTS"].getboolean("useCompression") IOf.writeResultToRaster( - dem["originalHeader"], resField, outFile, flip=True, useCompression=useCompression + dem["originalHeader"], + resField, + outFile, + flip=True, + useCompression=useCompression, ) log.debug( "Results parameter: %s has been exported to Outputs/peakFiles for time step: %.2f " @@ -3042,10 +3064,74 @@ def exportFields( outFile = outDirPeakAll / dataName useCompression = cfg["EXPORTS"].getboolean("useCompression") IOf.writeResultToRaster( - dem["originalHeader"], resField, outFile, flip=True, useCompression=useCompression + dem["originalHeader"], + resField, + outFile, + flip=True, + useCompression=useCompression, ) +def _findWrapperModuleInStack(): + """Find wrapper module name by inspecting the call stack. + + Searches the call stack for wrapper modules (e.g., com5SnowSlide, com6RockAvalanche) + that are calling into com1DFA functions. + + Returns + ------- + str or None + Wrapper module name if found (e.g., "com6RockAvalanche"), None otherwise + """ + for frameInfo in inspect.stack(): + frameModule = frameInfo.frame.f_globals.get("__name__", "") + # Look for modules matching comN{Name}.comN{Name} pattern + # but not com1DFA.com1DFA itself + if frameModule.startswith("avaframe.com"): + # Extract the last component (the actual module name) + moduleName = frameModule.split(".")[-1] + # Check if it matches the comN pattern (starts with "com" followed by a digit) + if re.match(r"^com\d+", moduleName) and not frameModule.endswith("com1DFA.com1DFA"): + return moduleName + return None + + +def getModuleNames(module): + """Extract module name and short form by checking the call stack for wrapper modules. + + This function checks if we're being called from a wrapper module (e.g., com5SnowSlide, + com6RockAvalanche) by inspecting the call stack. If found, it uses the wrapper's name. + Otherwise, it falls back to the passed module parameter. + + Parameters + ---------- + module: module + The module object to extract names from (fallback if no wrapper is found) + + Returns + ------- + tuple + (modName, modNameShort) where modName is the full name (e.g., "com1DFA") + and modNameShort is the short form (e.g., "com1") + """ + # Check for wrapper module in call stack + modName = _findWrapperModuleInStack() + + # Fall back to passed module if no wrapper found + if not modName: + modName = module.__name__.split(".")[-1] + + # Special case: com7Regional should be treated as com1DFA + if modName == "com7Regional": + modName = "com1DFA" + + # Extract short name (com1, com8, etc.) + shortModMatch = re.match(r"^(com\d+)", modName) + modNameShort = shortModMatch.group(1) if shortModMatch else modName + + return modName, modNameShort + + def prepareVarSimDict(standardCfg, inputSimFiles, variationDict, simNameExisting="", module=com1DFA): """Prepare a dictionary with simulations that shall be run with varying parameters following the variation dict @@ -3070,8 +3156,8 @@ def prepareVarSimDict(standardCfg, inputSimFiles, variationDict, simNameExisting simType and contains full configuration configparser object for simulation run """ - # extract the name of the module - modName = module.__name__.split(".")[-1] + # extract the full module name and short form (e.g., "com1DFA" -> "com1") + modName, modNameShort = getModuleNames(module) # get list of simulation types that are desired if "simTypeList" in variationDict: @@ -3176,7 +3262,7 @@ def prepareVarSimDict(standardCfg, inputSimFiles, variationDict, simNameExisting cfgSim["INPUT"]["secondaryRelThFile"] = pathToSecRel inputSimFiles["entResInfo"]["secondaryRelRemeshed"] = remeshedSecRel - if modName == "com1DFA": + if modName in ["com1DFA", "com5SnowSlide", "com6RockAvalanche"]: # check if spatialVoellmy is chosen that friction fields have correct extent if cfgSim["GENERAL"]["frictModel"].lower() == "spatialvoellmy": dem = IOf.readRaster(pathlib.Path(cfgSim["GENERAL"]["avalancheDir"], "Inputs", pathToDem)) @@ -3271,7 +3357,7 @@ def prepareVarSimDict(standardCfg, inputSimFiles, variationDict, simNameExisting pathToDemFull = pathlib.Path(cfgSim["GENERAL"]["avalancheDir"], "Inputs", pathToDem) - if modName == "com1DFA": + if modName in ["com1DFA", "com5SnowSlide", "com6RockAvalanche"]: # if frictModel is samosATAuto compute release vol if cfgSim["GENERAL"]["frictModel"].lower() == "samosatauto": relVolume = fetchRelVolume(rel, cfgSim, pathToDemFull, inputSimFiles["secondaryRelFile"]) @@ -3287,7 +3373,7 @@ def prepareVarSimDict(standardCfg, inputSimFiles, variationDict, simNameExisting # set frictModelIndicator, this needs to happen AFTER checkCfgFrictModel frictIndi = com1DFATools.setFrictTypeIndicator(cfgSim) - elif modName == "com8MoTPSA": + elif modName in ["com8MoTPSA", "com9MoTVoellmy"]: relVolume = fetchRelVolume(rel, cfgSim, pathToDemFull, inputSimFiles["secondaryRelFile"]) # set Volume class identificator @@ -3304,6 +3390,7 @@ def prepareVarSimDict(standardCfg, inputSimFiles, variationDict, simNameExisting [ relNameSim, simHash, + modNameShort, defID, frictIndi or volIndi, row._asdict()["simTypeList"], @@ -3321,7 +3408,7 @@ def prepareVarSimDict(standardCfg, inputSimFiles, variationDict, simNameExisting "relFile": rel, "cfgSim": cfgSimObject, } - if modName == "com1DFA": + if modName in ["com1DFA", "com5SnowSlide", "com6RockAvalanche"]: # write configuration file, dont need to write cfg file for com8MoTPSA (does this later when creating rcf file) cfgUtils.writeCfgFile( cfgSimObject["GENERAL"]["avalancheDir"], diff --git a/avaframe/com1DFA/com1DFATools.py b/avaframe/com1DFA/com1DFATools.py index a1b52b2b5..7c940f61b 100644 --- a/avaframe/com1DFA/com1DFATools.py +++ b/avaframe/com1DFA/com1DFATools.py @@ -161,7 +161,7 @@ def compareSimCfgToDefaultCfgCom1DFA(simCfg, module=com1DFA): # sphKernelSize is set during runtime, make sure it is not reported # as changed if default is set to meshCellSize - if modName == "com1DFA": + if modName in ["com1DFA", "com5SnowSlide", "com6RockAvalanche"]: if defCfg["GENERAL"]["sphKernelRadius"] == "meshCellSize": if simCfg["GENERAL"]["sphKernelRadius"] == simCfg["GENERAL"]["meshCellSize"]: excludeItems.append("root['GENERAL']['sphKernelRadius']") diff --git a/avaframe/in3Utils/cfgUtils.py b/avaframe/in3Utils/cfgUtils.py index 69cd0a5e5..d4c9ef4b5 100644 --- a/avaframe/in3Utils/cfgUtils.py +++ b/avaframe/in3Utils/cfgUtils.py @@ -1,5 +1,37 @@ """ -Utilities for handling configuration files +Utilities for handling configuration files and simulation naming + +This module provides functions for: +- Configuration file reading, writing, and merging +- Configuration comparison and hashing +- Simulation name parsing and construction + +Simulation Name Format +---------------------- +AvaFrame uses structured simulation names with two supported formats: + +**New format (with module name):** + relName_simHash_modName_defID_[frictIndi]_simType_modelType[_resType][_timeStep] + +**Old format (without module name):** + relName_simHash_[defID]_[frictIndi]_simType_modelType[_resType][_timeStep] + +Where: + - relName: Release area scenario name (required) + - simHash: Configuration hash (required, 10 characters) + - modName: Short module name - "com1", "com2", etc. (new format only) + - defID: Default indicator - "C" or "D" (defaults to "C") (required) + - frictIndi: Friction calibration - "S", "M", or "L" (optional) + - simType: Simulation type - "null", "ent", "res", "entres" (required) + - modelType: Model type - "dfa", etc. (required) + - resType: Result type - "ppr", "pft", "pfv", etc. (filename only) + - timeStep: Time step value (filename only) + +The module name in the new format uses SHORT form only (e.g., "com1" not "com1DFA"). +This was implemented in 2025-12 to support better organization and filtering of simulations. + +Use `parseSimName()` to extract components from any simulation name. +Backward compatibility is maintained - old format names are still supported. """ @@ -408,6 +440,123 @@ def readCfgFile(avaDir, module="", fileName=""): return cfg +def parseSimName(name): + """Parse simulation name handling both old and new formats. + + Auto-detects: + - Old format: relName_simHash_defID_[frictIndi]_simType_modelType[_resType][_timeStep] + - New format: relName_simHash_modName_defID_[frictIndi]_simType_modelType[_resType][_timeStep] + [ ] denotes optional items + + Parameters + ---------- + name : str + Simulation name or full filename to parse + + Returns + ------- + dict + Dictionary with keys: + - releaseName: str (required) + - simHash: str (required) + - modName: str (required, "NA" for old format) + - defID: str (required, defaults to "C") + - frictIndi: str | None (optional, values: "S", "M", "L") + - simType: str (required) + - modelType: str (required) + - resType: str | None (optional, only in filenames) + - timeStep: str | None (optional, only in filenames) + + Raises + ------ + ValueError + If required components are missing or format is invalid + + Examples + -------- + >>> parseSimName("release1_a1b2c3_C_S_ent_dfa") + {'releaseName': 'release1', 'simHash': 'a1b2c3', 'modName': 'NA', 'defID': 'C', + 'frictIndi': 'S', 'simType': 'ent', 'modelType': 'dfa', 'resType': None, 'timeStep': None} + + >>> parseSimName("release1_a1b2c3_com1_C_S_ent_dfa") + {'releaseName': 'release1', 'simHash': 'a1b2c3', 'modName': 'com1', 'defID': 'C', + 'frictIndi': 'S', 'simType': 'ent', 'modelType': 'dfa', 'resType': None, 'timeStep': None} + """ + + # Step 1: Handle _AF_ separator + if "_AF_" in name: + nameParts = name.split("_AF_") + releaseName = nameParts[0] + infoParts = nameParts[1].split("_") + else: + nameParts = name.split("_") + releaseName = nameParts[0] + infoParts = nameParts[1:] + + # Step 2: Extract simHash (always first in infoParts) + if len(infoParts) < 1: + raise ValueError(f"Invalid simName format: no simHash found in '{name}'") + simHash = infoParts[0] + + # Step 3: Detect format via module name pattern (com\d+ with optional letters) + # Matches both "com1" and "com1DFA", but extracts only short form (e.g., "com1") + modulePattern = re.compile(r"^com\d+[A-Za-z]*$") + shortModPattern = re.compile(r"^(com\d+)") + + if len(infoParts) > 1 and modulePattern.match(infoParts[1]): + # NEW FORMAT - extract short module name (e.g., "com1" from "com1DFA" or "com1") + match = shortModPattern.match(infoParts[1]) + modName = match.group(1) if match else infoParts[1] + remainingParts = infoParts[2:] # Start after modName + else: + # OLD FORMAT + modName = "NA" + remainingParts = infoParts[1:] # Start after simHash + + # Step 4: Detect optional indicators + defID = "C" # Default + frictIndi = None + offset = 0 + + if len(remainingParts) > 0 and remainingParts[0] in ["C", "D"]: + defID = remainingParts[0] + offset = 1 + + if len(remainingParts) > offset and remainingParts[offset] in ["S", "M", "L"]: + frictIndi = remainingParts[offset] + offset += 1 + + # Step 5: Extract required components (simType, modelType) + if len(remainingParts) < offset + 2: + raise ValueError(f"Invalid simName format: missing required components in '{name}'") + + simType = remainingParts[offset] + modelType = remainingParts[offset + 1] + + # Step 6: Extract optional file components (resType, timeStep) + resType = None + timeStep = None + + if len(remainingParts) > offset + 2: + resType = remainingParts[offset + 2] + + if len(remainingParts) > offset + 3: + timeStep = remainingParts[offset + 3] + + # Step 7: Return structured dictionary + return { + "releaseName": releaseName, + "simHash": simHash, + "modName": modName, + "defID": defID, + "frictIndi": frictIndi, + "simType": simType, + "modelType": modelType, + "resType": resType, + "timeStep": timeStep, + } + + def cfgHash(cfg, typeDict=False): """UID hash of a config. Given a configParser object cfg, or a dictionary - then typeDict=True, returns a uid hash @@ -530,14 +679,8 @@ def createConfigurationInfo( for cFile in configFiles: if "sourceConfiguration" not in str(cFile): simName = pathlib.Path(cFile).stem - if "_AF_" in simName: - nameParts = simName.split("_AF_") - infoParts = nameParts[1].split("_") - - else: - nameParts = simName.split("_") - infoParts = nameParts[1:] - simHash = infoParts[0] + # Extract simHash using parseSimName + simHash = parseSimName(simName)["simHash"] cfgObject = readCfgFile(avaDir, fileName=cFile) simDF = appendCgf2DF(simHash, simName, cfgObject, simDF) diff --git a/avaframe/in3Utils/fileHandlerUtils.py b/avaframe/in3Utils/fileHandlerUtils.py index 1c1b76b71..91f42ea75 100644 --- a/avaframe/in3Utils/fileHandlerUtils.py +++ b/avaframe/in3Utils/fileHandlerUtils.py @@ -13,6 +13,7 @@ # Local imports import avaframe.in2Trans.rasterUtils as IOf +import avaframe.in3Utils.cfgUtils as cfgUtils # create local logger # change log level in calling module to DEBUG to see log messages @@ -600,61 +601,41 @@ def makeSimDF(inputDir, avaDir="", simID="simID"): data["files"].append(datafiles[m]) name = datafiles[m].stem data["names"].append(name) - if "_AF_" in name: - nameParts = name.split("_AF_") - fNamePart = nameParts[0] + "_AF" - relNameSim = nameParts[0] - infoParts = nameParts[1].split("_") + # Parse the filename to extract all components + simNameParts = cfgUtils.parseSimName(name) + + # Populate data dictionary from parsed result + data["releaseArea"].append(simNameParts["releaseName"]) + data[simID].append(simNameParts["simHash"]) + data["isDefault"].append(simNameParts["defID"] if simNameParts["defID"] != "_C_" else None) + data["frictCalib"].append(simNameParts["frictIndi"]) + data["simType"].append(simNameParts["simType"]) + data["modelType"].append(simNameParts["modelType"]) + data["resType"].append(simNameParts["resType"] if simNameParts["resType"] else "") + data["timeStep"].append(simNameParts["timeStep"] if simNameParts["timeStep"] else "") + + # Reconstruct simName (without resType and timeStep) + # Preserve _AF_ separator if present in original name + if "_AF_" in name: + simNameBase = simNameParts["releaseName"] + "_AF_" + simNameParts["simHash"] else: - nameParts = name.split("_") - fNamePart = nameParts[0] - relNameSim = nameParts[0] - infoParts = nameParts[1:] - - data["releaseArea"].append(relNameSim) - data[simID].append(infoParts[0]) - - indiStr = ["_C_", "_D_"] - if any(x in name for x in indiStr): - data["isDefault"].append(infoParts[1]) - # now check for friction calibration info - frictIndi = ["_S_", "_M_", "_L_"] - if any(x in name for x in frictIndi): - data["frictCalib"].append(infoParts[2]) - j = 1 # j indicates whether there's an additional info - else: - data["frictCalib"].append(None) - j = 0 - - data["simType"].append(infoParts[2 + j]) - data["modelType"].append(infoParts[3 + j]) - data["resType"].append(infoParts[4 + j]) - data["simName"].append(fNamePart + "_" + ("_".join(infoParts[0 : (4 + j)]))) - - header = IOf.readRasterHeader(datafiles[m]) - data["cellSize"].append(header["cellsize"]) - if len(infoParts) == (6 + j): - data["timeStep"].append(infoParts[5 + j]) - else: - data["timeStep"].append("") - - # If it still is an 'old' simname - # This can be removed at one point - else: - data["isDefault"].append(None) - data["frictCalib"].append(None) - data["simType"].append(infoParts[1]) - data["modelType"].append(infoParts[2]) - data["resType"].append(infoParts[3]) - data["simName"].append(fNamePart + "_" + ("_".join(infoParts[0:3]))) - - header = IOf.readRasterHeader(datafiles[m]) - data["cellSize"].append(header["cellsize"]) - if len(infoParts) == 5: - data["timeStep"].append(infoParts[4]) - else: - data["timeStep"].append("") + simNameBase = simNameParts["releaseName"] + "_" + simNameParts["simHash"] + + parts = [simNameBase] + if simNameParts["modName"] != "NA": + parts.append(simNameParts["modName"]) + # Only add defID if it was explicitly in the original filename + if "_C_" in name or "_D_" in name: + parts.append(simNameParts["defID"]) + # Only add frictIndi if it was in the original + if simNameParts["frictIndi"]: + parts.append(simNameParts["frictIndi"]) + parts.extend([simNameParts["simType"], simNameParts["modelType"]]) + data["simName"].append("_".join(parts)) + + header = IOf.readRasterHeader(datafiles[m]) + data["cellSize"].append(header["cellsize"]) # Set name of avalanche if avaDir is given if avaDir != "": @@ -730,43 +711,44 @@ def makeSimFromResDF(avaDir, comModule, inputDir="", simName=""): for file in datafiles: name = file.stem - if "_AF_" in name: - nameParts = name.split("_AF_") - fNamePart = nameParts[0] + "_AF" - relNameSim = nameParts[0] - infoParts = nameParts[1].split("_") - resType = infoParts[-1] + # Parse the filename to extract components + simNameParts = cfgUtils.parseSimName(name) + + # Extract simName (without resType/timeStep) and resType + resType = simNameParts["resType"] if simNameParts["resType"] else name.split("_")[-1] + # Reconstruct simName without resType and timeStep + # Preserve _AF_ separator if present in original name + if "_AF_" in name: + simNameBase = simNameParts["releaseName"] + "_AF_" + simNameParts["simHash"] else: - nameParts = name.split("_") - fNamePart = nameParts[0] - relNameSim = nameParts[0] - infoParts = nameParts[1:] - resType = infoParts[-1] - simName = fNamePart + "_" + ("_".join(infoParts[0:-1])) + simNameBase = simNameParts["releaseName"] + "_" + simNameParts["simHash"] + + parts = [simNameBase] + if simNameParts["modName"] != "NA": + parts.append(simNameParts["modName"]) + # Only add defID if it was explicitly in the original filename + if "_C_" in name or "_D_" in name: + parts.append(simNameParts["defID"]) + # Only add frictIndi if it was in the original + if simNameParts["frictIndi"]: + parts.append(simNameParts["frictIndi"]) + parts.extend([simNameParts["simType"], simNameParts["modelType"]]) + simName = "_".join(parts) + # add line in the DF if the simulation does not exist yet if simName not in dataDF.simName.values: newLine = pd.DataFrame([[simName]], columns=["simName"], index=[simName]) dataDF = pd.concat([dataDF, newLine], ignore_index=False) - dataDF.loc[simName, "releaseArea"] = relNameSim - dataDF.loc[simName, "simHash"] = infoParts[0] - # TODO: remove once all simNames are updated to include C or D as simModified - if len(infoParts) == 6: # this is the _C_M_ etc variant - dataDF.loc[simName, "simModified"] = infoParts[1] - dataDF.loc[simName, "simType"] = infoParts[3] - dataDF.loc[simName, "modelType"] = infoParts[4] - elif len(infoParts) == 5: - dataDF.loc[simName, "simModified"] = infoParts[1] - dataDF.loc[simName, "simType"] = infoParts[2] - dataDF.loc[simName, "modelType"] = infoParts[3] - elif len(infoParts) == 4: - dataDF.loc[simName, "simModified"] = "not specified" - dataDF.loc[simName, "simType"] = infoParts[1] - dataDF.loc[simName, "modelType"] = infoParts[2] + dataDF.loc[simName, "releaseArea"] = simNameParts["releaseName"] + dataDF.loc[simName, "simHash"] = simNameParts["simHash"] + # Only set simModified if defID was explicitly in filename + if "_C_" in name or "_D_" in name: + dataDF.loc[simName, "simModified"] = simNameParts["defID"] else: - message = "simName format not recognized for simName: %s" % simName - log.error(message) - raise AssertionError(message) + dataDF.loc[simName, "simModified"] = "not specified" + dataDF.loc[simName, "simType"] = simNameParts["simType"] + dataDF.loc[simName, "modelType"] = simNameParts["modelType"] # add info about the cell size header = IOf.readRasterHeader(file) diff --git a/avaframe/log2Report/generateReport.py b/avaframe/log2Report/generateReport.py index 00ea810fd..7ec6e0b6e 100644 --- a/avaframe/log2Report/generateReport.py +++ b/avaframe/log2Report/generateReport.py @@ -11,6 +11,9 @@ from tabulate import tabulate from datetime import datetime +# Local imports +from avaframe.in3Utils import cfgUtils + # create local logger # change log level in calling module to DEBUG to see log messages log = logging.getLogger(__name__) @@ -252,13 +255,8 @@ def checkAndCleanReportDictOnWinIssue872(reportDictList): for k, listItem in enumerate(reportDictList): simName = listItem['simName']['name'] - if '_AF_' in simName: - nameParts = simName.split('_AF_') - else: - nameParts = simName.split('_') - - # This is the proper simName - simNameClean = nameParts[0] + # Extract release name using parseSimName + simNameClean = cfgUtils.parseSimName(simName)["releaseName"] if listItem['Simulation Parameters']['Release Area Scenario'] != simNameClean: reportDictList[k]['Simulation Parameters']['Release Area Scenario'] = simNameClean diff --git a/avaframe/out1Peak/outPlotAllPeak.py b/avaframe/out1Peak/outPlotAllPeak.py index f3e392726..045511e37 100644 --- a/avaframe/out1Peak/outPlotAllPeak.py +++ b/avaframe/out1Peak/outPlotAllPeak.py @@ -53,7 +53,10 @@ def plotAllPeakFields(avaDir, cfgFLAGS, modName, demData=""): inputDir = avaDir / "Outputs" / modName / "peakFiles" inDir = avaDir / "Inputs" peakFilesDF = fU.makeSimDF(inputDir, avaDir=avaDir) - if modName in ["com1DFA", "com9MoTVoellmy"] and demData == "": + if ( + modName in ["com1DFA", "com5SnowSlide", "com6RockAvalanche", "com9MoTVoellmy", "com8MoTPSA"] + and demData == "" + ): configurationDF = cfgUtils.createConfigurationInfo(avaDir, comModule=modName) configurationDF = configurationDF.rename(columns={"resType": "resTypeList"}) peakFilesDF = ( @@ -104,7 +107,13 @@ def plotAllPeakFields(avaDir, cfgFLAGS, modName, demData=""): # this enables to append simulations to an already existing output without regenerating all plots if not plotName.is_file(): # for comModules load DEM used for computation - if demData == "" and modName in ["com1DFA", "com9MoTVoellmy"]: + if demData == "" and modName in [ + "com1DFA", + "com5SnowSlide", + "com6RockAvalanche", + "com9MoTVoellmy", + "com8MoTPSA", + ]: demFile = inDir / row["DEM"] demDataRaster = IOf.readRaster(demFile, noDataToNan=True) demDataField = demDataRaster["rasterData"] diff --git a/avaframe/runScripts/runPlotAreaRefDiffs.py b/avaframe/runScripts/runPlotAreaRefDiffs.py index 752dc5092..9afb82ebf 100644 --- a/avaframe/runScripts/runPlotAreaRefDiffs.py +++ b/avaframe/runScripts/runPlotAreaRefDiffs.py @@ -1,6 +1,7 @@ """ - Run script for plotting a comparison of simulation result to reference polygon +Run script for plotting a comparison of simulation result to reference polygon """ + # Load modules # importing general python modules import pathlib @@ -21,13 +22,13 @@ ################USER Input############# resType = "ppr" thresholdValueSimulation = 0.9 -modName = 'com1DFA' +modName = "com1DFA" ############################################################ # Load avalanche directory from general configuration file cfgMain = cfgUtils.getGeneralConfig() avalancheDir = cfgMain["MAIN"]["avalancheDir"] -outDir = pathlib.Path(avalancheDir, 'Outputs', 'out1Peak') +outDir = pathlib.Path(avalancheDir, "Outputs", "out1Peak") fU.makeADir(outDir) # Start logging @@ -45,16 +46,16 @@ dem = gT.getNormalMesh(dem, num=1) # get real Area dem = DFAtls.getAreaMesh(dem, 1) -dem['originalHeader'] = dem['header'] +dem["originalHeader"] = dem["header"] # read reference data set -inDir = pathlib.Path(avalancheDir, 'Inputs') +inDir = pathlib.Path(avalancheDir, "Inputs") referenceFile, availableFile, _ = gI.getAndCheckInputFiles( inDir, "REFDATA", "POLY", fileExt="shp", fileSuffix="POLY" ) # convert polygon to raster with value 1 inside polygon and 0 outside the polygon referenceLine = shpConv.readLine(referenceFile, "reference", dem) -referenceLine= gT.prepareArea(referenceLine, dem, np.sqrt(2),combine=True, checkOverlap=False) +referenceLine = gT.prepareArea(referenceLine, dem, np.sqrt(2), combine=True, checkOverlap=False) # if available zoom into area provided by crop shp file in Inputs/CROPSHAPE cropFile, cropInfo, _ = gI.getAndCheckInputFiles( @@ -64,7 +65,7 @@ cropLine = shpConv.readLine(cropFile, "cropFile", dem) cropLine = gT.prepareArea(cropLine, dem, np.sqrt(2), combine=True, checkOverlap=False) -if modName == 'com1DFA': +if modName in ["com1DFA", "com5SnowSlide", "com6RockAvalanche", "com8MoTPSA", "com9MoTVoellmy"]: # load dataFrame for all configurations of simulations in avalancheDir simDF = cfgUtils.createConfigurationInfo(avalancheDir) # create data frame that lists all available simulations and path to their result type result files @@ -80,35 +81,58 @@ # compute referenceMask and simulationMask and true positive, false positive and false neg. arrays # here thresholdValueReference is set to 0.9 as when converting the polygon to a raster, # values inside polygon are set to 1 and outside to 0 - refMask, compMask, indicatorDict = oPD.computeAreaDiff(referenceLine['rasterData'], - simData['rasterData'], - 0.9, - thresholdValueSimulation, - dem, - cropToArea=cropLine['rasterData']) + refMask, compMask, indicatorDict = oPD.computeAreaDiff( + referenceLine["rasterData"], + simData["rasterData"], + 0.9, + thresholdValueSimulation, + dem, + cropToArea=cropLine["rasterData"], + ) # plot differences - oPD.plotAreaDiff(referenceLine['rasterData'], refMask, simData['rasterData'], compMask, resType, simData['header'], - thresholdValueSimulation, outDir, - indicatorDict, row['simName'], cropFile=cropFile) + oPD.plotAreaDiff( + referenceLine["rasterData"], + refMask, + simData["rasterData"], + compMask, + resType, + simData["header"], + thresholdValueSimulation, + outDir, + indicatorDict, + row["simName"], + cropFile=cropFile, + ) else: # load all result files - resultDir = pathlib.Path(avalancheDir, 'Outputs', modName, 'peakFiles') + resultDir = pathlib.Path(avalancheDir, "Outputs", modName, "peakFiles") peakFilesList = list(resultDir.glob("*_%s.tif" % resType)) + list(resultDir.glob("*_%s.asc" % resType)) for pF in peakFilesList: simData = IOf.readRaster(pF) simName = pF.stem # compute referenceMask and simulationMask and true positive, false positive and false neg. arrays - refMask, compMask, indicatorDict = oPD.computeAreaDiff(referenceLine['rasterData'], - simData['rasterData'], - 0.9, - thresholdValueSimulation, - dem, - cropToArea=cropLine['rasterData']) + refMask, compMask, indicatorDict = oPD.computeAreaDiff( + referenceLine["rasterData"], + simData["rasterData"], + 0.9, + thresholdValueSimulation, + dem, + cropToArea=cropLine["rasterData"], + ) # plot differences - oPD.plotAreaDiff(referenceLine['rasterData'], refMask, simData['rasterData'], compMask, resType, - simData['header'], - thresholdValueSimulation, outDir, - indicatorDict, simName, cropFile=cropFile) + oPD.plotAreaDiff( + referenceLine["rasterData"], + refMask, + simData["rasterData"], + compMask, + resType, + simData["header"], + thresholdValueSimulation, + outDir, + indicatorDict, + simName, + cropFile=cropFile, + ) diff --git a/avaframe/tests/test_DFAPathGeneration.py b/avaframe/tests/test_DFAPathGeneration.py index 720eba791..c47b2452a 100644 --- a/avaframe/tests/test_DFAPathGeneration.py +++ b/avaframe/tests/test_DFAPathGeneration.py @@ -203,3 +203,139 @@ def test_getParabolicFit(): # print(splitPoint) # print(angle) assert splitPoint['s'] == 50 + + +def test_getSplitPoint_noPointFound(): + """Test getSplitPoint when no point meets slope criteria - should return top point""" + cfg = configparser.ConfigParser() + cfg['PATH'] = {'slopeSplitPoint': '5', 'dsMin': '5'} # Very low slope requirement + + # Create profile with steep slope everywhere + avaProfile = { + 'x': np.array([0, 10, 20, 30]), + 'y': np.array([0, 10, 20, 30]), + 'z': np.array([50, 30, 10, 0]), # Steep slope throughout + 's': np.array([0, 14.14, 28.28, 42.43]), + 'indStartMassAverage': 0, + 'indEndMassAverage': 3 + } + # Parabolic fit with steep slope at bottom + parabolicFit = {'a': 0.01, 'b': -2, 'c': 50} + + splitPoint = DFAPathGeneration.getSplitPoint(cfg['PATH'], avaProfile, parabolicFit) + + # Should return top point when no split point found + assert splitPoint.get('isTopSplitPoint', False) is True + assert splitPoint['x'] == avaProfile['x'][0] + assert splitPoint['y'] == avaProfile['y'][0] + assert splitPoint['z'] == avaProfile['z'][0] + + +def test_getMassAvgPathFromFields(): + """Test computing mass-averaged path from field data""" + # Create simple 5x5 field with flow in the middle + fieldsList = [{ + 'FT': np.array([[0, 0, 0, 0, 0], + [0, 1, 2, 1, 0], + [0, 2, 3, 2, 0], + [0, 1, 2, 1, 0], + [0, 0, 0, 0, 0]]), # Flow thickness + 'FM': np.array([[0, 0, 0, 0, 0], + [0, 5, 10, 5, 0], + [0, 10, 15, 10, 0], + [0, 5, 10, 5, 0], + [0, 0, 0, 0, 0]]), # Flow mass + 'FV': np.array([[0, 0, 0, 0, 0], + [0, 2, 3, 2, 0], + [0, 3, 4, 3, 0], + [0, 2, 3, 2, 0], + [0, 0, 0, 0, 0]]) # Flow velocity + }] + + fieldHeader = { + 'ncols': 5, + 'nrows': 5, + 'xllcenter': 100, + 'yllcenter': 200, + 'cellsize': 5 + } + + dem = { + 'rasterData': np.array([[50, 50, 50, 50, 50], + [40, 40, 40, 40, 40], + [30, 30, 30, 30, 30], + [20, 20, 20, 20, 20], + [10, 10, 10, 10, 10]]) + } + + result = DFAPathGeneration.getMassAvgPathFromFields(fieldsList, fieldHeader, dem) + + # Verify structure + assert 'x' in result + assert 'y' in result + assert 'z' in result + assert 's' in result + assert 'xstd' in result + assert 'ystd' in result + assert 'zstd' in result + + # Verify velocity info is included + assert 'u2' in result + assert 'ekin' in result + assert 'u2std' in result + assert 'ekinstd' in result + assert 'totEKin' in result + + # Should have one time step + assert len(result['x']) == 1 + assert len(result['y']) == 1 + assert len(result['z']) == 1 + + # Coordinates should be relative to origin (xllcenter and yllcenter subtracted) + # Mass-weighted average should be close to center + assert result['x'][0] > 0 # Relative to xllcenter + assert result['y'][0] > 0 # Relative to yllcenter + + +def test_getMassAvgPathFromFields_noVelocity(): + """Test getMassAvgPathFromFields without velocity data""" + # Create simple field without velocity info + fieldsList = [{ + 'FT': np.array([[0, 1, 0], + [0, 2, 0], + [0, 0, 0]]), + 'FM': np.array([[0, 5, 0], + [0, 10, 0], + [0, 0, 0]]) + # No FV field + }] + + fieldHeader = { + 'ncols': 3, + 'nrows': 3, + 'xllcenter': 0, + 'yllcenter': 0, + 'cellsize': 10 + } + + dem = { + 'rasterData': np.array([[30, 30, 30], + [20, 20, 20], + [10, 10, 10]]) + } + + result = DFAPathGeneration.getMassAvgPathFromFields(fieldsList, fieldHeader, dem) + + # Verify basic structure + assert 'x' in result + assert 'y' in result + assert 'z' in result + assert 's' in result + + # Velocity info should NOT be present + assert 'u2' not in result + assert 'ekin' not in result + assert 'totEKin' not in result + + # Should have one time step + assert len(result['x']) == 1 diff --git a/avaframe/tests/test_ana1Tests.py b/avaframe/tests/test_ana1Tests.py index 17dd3b75b..03f4c24ab 100644 --- a/avaframe/tests/test_ana1Tests.py +++ b/avaframe/tests/test_ana1Tests.py @@ -8,6 +8,7 @@ import avaframe.ana1Tests.analysisTools as anaTools import avaframe.ana1Tests.energyLineTest as energyLineTest +import avaframe.ana1Tests.rotationTest as rotationTest import avaframe.com1DFA.com1DFA as com1DFA import avaframe.in3Utils.fileHandlerUtils as fU from avaframe.in3Utils import cfgHandling @@ -229,3 +230,65 @@ def test_mainEnergyLineTest(tmp_path): assert abs(resultEnergyTest["runOutZError"]) < 0.02 assert abs(resultEnergyTest["rmseVelocityElevation"]) < 0.02 assert abs(resultEnergyTest["runOutAngleError"]) < 0.0003 + + +# ############# Test rotation test ########## +# ############################################ +def test_initializeRotationTestReport(tmp_path): + """Test report initialization creates correct structure""" + avalancheDir = tmp_path / "testAva" + resTypeList = ["ppr", "pfv", "pft"] + comModule = "com1DFA" + refSimName = "testSim" + flagMass = False + + report = rotationTest.initializeRotationTestReport( + avalancheDir, resTypeList, comModule, refSimName, flagMass + ) + + # Check report structure + assert "headerLine" in report + assert report["headerLine"]["type"] == "title" + assert report["headerLine"]["title"] == "Rotation Test for DFA Simulation" + + assert "avaName" in report + assert report["avaName"]["type"] == "avaName" + + assert "time" in report + assert report["time"]["type"] == "time" + + assert "Simulation Parameters" in report + assert report["Simulation Parameters"]["type"] == "list" + assert report["Simulation Parameters"]["DFA module"] == comModule + assert report["Simulation Parameters"]["Reference simulation"] == refSimName + + # Check column names for result tables + assert "Rotation test input simulations" in report + assert "simName" in report["Rotation test input simulations"]["column names"] + + assert "Rotation test Energy line result table" in report + assert "sDiff" in report["Rotation test Energy line result table"]["column names"] + + assert "Rotation test AIMEC result table" in report + assert "sRunout" in report["Rotation test AIMEC result table"]["column names"] + + # Mass columns should not be present when flagMass is False + assert "relMass" not in report["Rotation test AIMEC result table"]["column names"] + + +def test_initializeRotationTestReport_withMass(tmp_path): + """Test report initialization with mass analysis enabled""" + avalancheDir = tmp_path / "testAva" + resTypeList = ["ppr", "pfv", "pft"] + comModule = "com1DFA" + refSimName = "testSim" + flagMass = True + + report = rotationTest.initializeRotationTestReport( + avalancheDir, resTypeList, comModule, refSimName, flagMass + ) + + # Mass columns should be present when flagMass is True + assert "relMass" in report["Rotation test AIMEC result table"]["column names"] + assert "finalMass" in report["Rotation test AIMEC result table"]["column names"] + assert "entMass" in report["Rotation test AIMEC result table"]["column names"] diff --git a/avaframe/tests/test_cfgUtils.py b/avaframe/tests/test_cfgUtils.py index 96920697d..0dfef3056 100644 --- a/avaframe/tests/test_cfgUtils.py +++ b/avaframe/tests/test_cfgUtils.py @@ -768,4 +768,280 @@ def test_setStrnanToNan_case_sensitivity(): # Verify all variations of 'nan' were converted for i in range(5): - assert pd.isna(resultDF.at[i, 'column1']) \ No newline at end of file + assert pd.isna(resultDF.at[i, 'column1']) + + +# Tests for parseSimName function + + +def test_parseSimName_oldFormat_basic(): + """Test parsing old format without modName""" + name = "release1_a1b2c3_C_S_ent_dfa" + result = cfgUtils.parseSimName(name) + + assert result["releaseName"] == "release1" + assert result["simHash"] == "a1b2c3" + assert result["modName"] == "NA" + assert result["defID"] == "C" + assert result["frictIndi"] == "S" + assert result["simType"] == "ent" + assert result["modelType"] == "dfa" + assert result["resType"] is None + assert result["timeStep"] is None + + +def test_parseSimName_oldFormat_minimal(): + """Test parsing minimal old format (no indicators)""" + name = "release1_a1b2c3_ent_dfa" + result = cfgUtils.parseSimName(name) + + assert result["releaseName"] == "release1" + assert result["simHash"] == "a1b2c3" + assert result["modName"] == "NA" + assert result["defID"] == "C" # Default + assert result["frictIndi"] is None + assert result["simType"] == "ent" + assert result["modelType"] == "dfa" + assert result["resType"] is None + assert result["timeStep"] is None + + +def test_parseSimName_oldFormat_defID_only(): + """Test parsing old format with defID but no frictIndi""" + name = "release1_a1b2c3_D_ent_dfa" + result = cfgUtils.parseSimName(name) + + assert result["releaseName"] == "release1" + assert result["simHash"] == "a1b2c3" + assert result["modName"] == "NA" + assert result["defID"] == "D" + assert result["frictIndi"] is None + assert result["simType"] == "ent" + assert result["modelType"] == "dfa" + + +def test_parseSimName_newFormat_basic(): + """Test parsing new format with modName""" + name = "release1_a1b2c3_com1_C_S_ent_dfa" + result = cfgUtils.parseSimName(name) + + assert result["releaseName"] == "release1" + assert result["simHash"] == "a1b2c3" + assert result["modName"] == "com1" + assert result["defID"] == "C" + assert result["frictIndi"] == "S" + assert result["simType"] == "ent" + assert result["modelType"] == "dfa" + assert result["resType"] is None + assert result["timeStep"] is None + + +def test_parseSimName_newFormat_com8(): + """Test parsing new format with com8 module""" + name = "release1_a1b2c3_com8_C_M_ent_dfa" + result = cfgUtils.parseSimName(name) + + assert result["releaseName"] == "release1" + assert result["simHash"] == "a1b2c3" + assert result["modName"] == "com8" + assert result["defID"] == "C" + assert result["frictIndi"] == "M" + assert result["simType"] == "ent" + assert result["modelType"] == "dfa" + + +def test_parseSimName_newFormat_com9(): + """Test parsing new format with com9 module""" + name = "release1_a1b2c3_com9_D_L_null_dfa" + result = cfgUtils.parseSimName(name) + + assert result["releaseName"] == "release1" + assert result["simHash"] == "a1b2c3" + assert result["modName"] == "com9" + assert result["defID"] == "D" + assert result["frictIndi"] == "L" + assert result["simType"] == "null" + assert result["modelType"] == "dfa" + + +def test_parseSimName_newFormat_minimal(): + """Test parsing new format without optional indicators""" + name = "release1_a1b2c3_com1_ent_dfa" + result = cfgUtils.parseSimName(name) + + assert result["releaseName"] == "release1" + assert result["simHash"] == "a1b2c3" + assert result["modName"] == "com1" + assert result["defID"] == "C" # Default + assert result["frictIndi"] is None + assert result["simType"] == "ent" + assert result["modelType"] == "dfa" + + +def test_parseSimName_withFile_oldFormat(): + """Test parsing full filename with resType and timeStep (old format)""" + name = "release1_a1b2c3_C_S_ent_dfa_ppr_100.5" + result = cfgUtils.parseSimName(name) + + assert result["releaseName"] == "release1" + assert result["simHash"] == "a1b2c3" + assert result["modName"] == "NA" + assert result["defID"] == "C" + assert result["frictIndi"] == "S" + assert result["simType"] == "ent" + assert result["modelType"] == "dfa" + assert result["resType"] == "ppr" + assert result["timeStep"] == "100.5" + + +def test_parseSimName_withFile_newFormat(): + """Test parsing full filename with resType and timeStep (new format)""" + name = "release1_a1b2c3_com1_C_S_ent_dfa_pft_50.2" + result = cfgUtils.parseSimName(name) + + assert result["releaseName"] == "release1" + assert result["simHash"] == "a1b2c3" + assert result["modName"] == "com1" + assert result["defID"] == "C" + assert result["frictIndi"] == "S" + assert result["simType"] == "ent" + assert result["modelType"] == "dfa" + assert result["resType"] == "pft" + assert result["timeStep"] == "50.2" + + +def test_parseSimName_withFile_resTypeOnly(): + """Test parsing filename with resType but no timeStep""" + name = "release1_a1b2c3_com1_C_ent_dfa_pfv" + result = cfgUtils.parseSimName(name) + + assert result["releaseName"] == "release1" + assert result["simHash"] == "a1b2c3" + assert result["modName"] == "com1" + assert result["defID"] == "C" + assert result["frictIndi"] is None + assert result["simType"] == "ent" + assert result["modelType"] == "dfa" + assert result["resType"] == "pfv" + assert result["timeStep"] is None + + +def test_parseSimName_AF_separator_oldFormat(): + """Test parsing with _AF_ separator (old format)""" + name = "release1_AF_a1b2c3_C_S_ent_dfa" + result = cfgUtils.parseSimName(name) + + assert result["releaseName"] == "release1" + assert result["simHash"] == "a1b2c3" + assert result["modName"] == "NA" + assert result["defID"] == "C" + assert result["frictIndi"] == "S" + assert result["simType"] == "ent" + assert result["modelType"] == "dfa" + + +def test_parseSimName_AF_separator_newFormat(): + """Test parsing with _AF_ separator (new format)""" + name = "release1_AF_a1b2c3_com1_D_ent_dfa" + result = cfgUtils.parseSimName(name) + + assert result["releaseName"] == "release1" + assert result["simHash"] == "a1b2c3" + assert result["modName"] == "com1" + assert result["defID"] == "D" + assert result["frictIndi"] is None + assert result["simType"] == "ent" + assert result["modelType"] == "dfa" + + +def test_parseSimName_all_defID_options(): + """Test all defID options (_C_ and _D_)""" + name_C = "release1_a1b2c3_com1_C_ent_dfa" + result_C = cfgUtils.parseSimName(name_C) + assert result_C["defID"] == "C" + + name_D = "release1_a1b2c3_com1_D_ent_dfa" + result_D = cfgUtils.parseSimName(name_D) + assert result_D["defID"] == "D" + + +def test_parseSimName_all_frictIndi_options(): + """Test all frictIndi options (_S_, _M_, _L_)""" + name_S = "release1_a1b2c3_com1_C_S_ent_dfa" + result_S = cfgUtils.parseSimName(name_S) + assert result_S["frictIndi"] == "S" + + name_M = "release1_a1b2c3_com1_C_M_ent_dfa" + result_M = cfgUtils.parseSimName(name_M) + assert result_M["frictIndi"] == "M" + + name_L = "release1_a1b2c3_com1_C_L_ent_dfa" + result_L = cfgUtils.parseSimName(name_L) + assert result_L["frictIndi"] == "L" + + +def test_parseSimName_complex_releaseName(): + """Test parsing with complex release names""" + name = "myRelease_Test_AF_a1b2c3_com1_C_ent_dfa" + result = cfgUtils.parseSimName(name) + assert result["releaseName"] == "myRelease_Test" + + name2 = "rel123_a1b2c3_com1_ent_dfa" + result2 = cfgUtils.parseSimName(name2) + assert result2["releaseName"] == "rel123" + + +def test_parseSimName_invalid_noSimHash(): + """Test error handling when simHash is missing""" + name = "release1" + with pytest.raises(ValueError, match="Invalid simName format: no simHash found"): + cfgUtils.parseSimName(name) + + +def test_parseSimName_invalid_missingComponents(): + """Test error handling when required components are missing""" + name = "release1_a1b2c3_ent" # Missing modelType + with pytest.raises(ValueError, match="Invalid simName format: missing required components"): + cfgUtils.parseSimName(name) + + +def test_parseSimName_invalid_onlyHash(): + """Test error handling when only hash exists""" + name = "release1_a1b2c3" + with pytest.raises(ValueError, match="Invalid simName format: missing required components"): + cfgUtils.parseSimName(name) + + +def test_parseSimName_realWorld_examples(): + """Test with realistic simulation names from codebase""" + # Typical old format from existing simulations + name1 = "release_9ae8f6_null_dfa" + result1 = cfgUtils.parseSimName(name1) + assert result1["releaseName"] == "release" + assert result1["simHash"] == "9ae8f6" + assert result1["modName"] == "NA" + assert result1["simType"] == "null" + assert result1["modelType"] == "dfa" + + # Old format with all indicators + name2 = "avalanche_abc123_C_S_ent_dfa_ppr" + result2 = cfgUtils.parseSimName(name2) + assert result2["releaseName"] == "avalanche" + assert result2["simHash"] == "abc123" + assert result2["modName"] == "NA" + assert result2["defID"] == "C" + assert result2["frictIndi"] == "S" + assert result2["simType"] == "ent" + assert result2["modelType"] == "dfa" + assert result2["resType"] == "ppr" + + # New format example + name3 = "testRel_xyz789_com1_D_M_ent_dfa" + result3 = cfgUtils.parseSimName(name3) + assert result3["releaseName"] == "testRel" + assert result3["simHash"] == "xyz789" + assert result3["modName"] == "com1" + assert result3["defID"] == "D" + assert result3["frictIndi"] == "M" + assert result3["simType"] == "ent" + assert result3["modelType"] == "dfa" \ No newline at end of file diff --git a/avaframe/tests/test_com1DFA.py b/avaframe/tests/test_com1DFA.py index 33374be59..ea96aa329 100644 --- a/avaframe/tests/test_com1DFA.py +++ b/avaframe/tests/test_com1DFA.py @@ -29,7 +29,9 @@ def test_prepareInputData(tmp_path): """test preparing input data""" # setup requuired input data - inputSimFiles = {"entResInfo": {"flagEnt": "Yes", "flagRes": "No", "flagSecondaryRelease": "No"}} + inputSimFiles = { + "entResInfo": {"flagEnt": "Yes", "flagRes": "No", "flagSecondaryRelease": "No"} + } dirName = pathlib.Path(__file__).parents[0] avaDir = dirName / ".." / "data" / "avaAlr" relFile = avaDir / "Inputs" / "REL" / "relAlr.shp" @@ -72,7 +74,9 @@ def test_prepareInputData(tmp_path): assert inputSimLines["entLine"]["initializedFrom"] == "shapefile" # call function to be tested - inputSimFiles = {"entResInfo": {"flagEnt": "No", "flagRes": "Yes", "flagSecondaryRelease": "No"}} + inputSimFiles = { + "entResInfo": {"flagEnt": "No", "flagRes": "Yes", "flagSecondaryRelease": "No"} + } dirName = pathlib.Path(__file__).parents[0] avaDir = dirName / ".." / "data" / "avaParabola" relFile = avaDir / "Inputs" / "REL" / "release1PF.shp" @@ -102,7 +106,9 @@ def test_prepareInputData(tmp_path): assert inputSimLines["resLine"]["initializedFrom"] == "shapefile" # call function to be tested - inputSimFiles = {"entResInfo": {"flagEnt": "No", "flagRes": "Yes", "flagSecondaryRelease": "No"}} + inputSimFiles = { + "entResInfo": {"flagEnt": "No", "flagRes": "Yes", "flagSecondaryRelease": "No"} + } dirName = pathlib.Path(__file__).parents[0] avaDir = dirName / ".." / "data" / "avaParabola" relFile = avaDir / "Inputs" / "REL" / "release1PF.shp" @@ -130,7 +136,9 @@ def test_prepareInputData(tmp_path): assert inputSimLines["relThField"] == "" # call function to be tested - inputSimFiles = {"entResInfo": {"flagEnt": "No", "flagRes": "Yes", "flagSecondaryRelease": "No"}} + inputSimFiles = { + "entResInfo": {"flagEnt": "No", "flagRes": "Yes", "flagSecondaryRelease": "No"} + } dirName = pathlib.Path(__file__).parents[0] avaDir = dirName / ".." / "data" / "avaParabola" relFile = avaDir / "Inputs" / "REL" / "release1PF.shp" @@ -159,11 +167,16 @@ def test_prepareInputData(tmp_path): assert inputSimLines["releaseLine"]["initializedFrom"] == "raster" assert inputSimLines["releaseLine"]["Name"] == "from raster" assert inputSimLines["releaseLine"]["thickness"] == "from raster" - assert inputSimLines["releaseLine"]["file"] == dirName / "data" / "relThFieldTestFile.asc" + assert ( + inputSimLines["releaseLine"]["file"] + == dirName / "data" / "relThFieldTestFile.asc" + ) assert inputSimLines["releaseLine"]["type"] == "Release from raster" # call function to be tested - inputSimFiles = {"entResInfo": {"flagEnt": "No", "flagRes": "Yes", "flagSecondaryRelease": "No"}} + inputSimFiles = { + "entResInfo": {"flagEnt": "No", "flagRes": "Yes", "flagSecondaryRelease": "No"} + } dirName = pathlib.Path(__file__).parents[0] avaDir = dirName / ".." / "data" / "avaParabola" relFile = avaDir / "Inputs" / "REL" / "release1PF.shp" @@ -205,7 +218,9 @@ def test_prepareInputData(tmp_path): # ) # setup required input data - inputSimFiles = {"entResInfo": {"flagEnt": "No", "flagRes": "No", "flagSecondaryRelease": "No"}} + inputSimFiles = { + "entResInfo": {"flagEnt": "No", "flagRes": "No", "flagSecondaryRelease": "No"} + } dirName = pathlib.Path(__file__).parents[0] avaDir = dirName / "data" / "avaTestRelTh" relFile = avaDir / "Inputs" / "REL" / "rel1.shp" @@ -246,7 +261,9 @@ def test_prepareInputData(tmp_path): assert inputSimLines["releaseLine"]["initializedFrom"] == "raster" # setup requuired input data - inputSimFiles = {"entResInfo": {"flagEnt": "No", "flagRes": "No", "flagSecondaryRelease": "Yes"}} + inputSimFiles = { + "entResInfo": {"flagEnt": "No", "flagRes": "No", "flagSecondaryRelease": "Yes"} + } dirName = pathlib.Path(__file__).parents[0] avaDir = dirName / "data" / "avaTestRelTh" relFile = avaDir / "Inputs" / "REL" / "rel1.shp" @@ -255,7 +272,9 @@ def test_prepareInputData(tmp_path): inputSimFiles["secondaryRelScenario"] = secrelFile inputSimFiles["demFile"] = avaDir / "Inputs" / "testDEM.asc" inputSimFiles["relThFile"] = None - inputSimFiles["secondaryRelThFile"] = avaDir / "Inputs" / "SECREL" / "testSecRel2.asc" + inputSimFiles["secondaryRelThFile"] = ( + avaDir / "Inputs" / "SECREL" / "testSecRel2.asc" + ) inputSimFiles["muFile"] = None inputSimFiles["xiFile"] = None inputSimFiles["kFile"] = None @@ -283,18 +302,24 @@ def test_prepareInputData(tmp_path): assert demOri["header"]["nrows"] == 22 assert inputSimLines["releaseLine"]["thickness"] == ["1.5", "0.7"] assert np.array_equal(inputSimLines["releaseLine"]["Start"], np.asarray([0.0, 9.0])) - assert np.array_equal(inputSimLines["releaseLine"]["Length"], np.asarray([9.0, 5.0])) + assert np.array_equal( + inputSimLines["releaseLine"]["Length"], np.asarray([9.0, 5.0]) + ) assert inputSimLines["releaseLine"]["Name"] == ["releaseNew1", "releaseNew2"] assert inputSimLines["releaseLine"]["ci95"] == ["0.4", "0.1"] assert inputSimLines["secondaryReleaseLine"]["Name"] == "from raster" assert inputSimLines["secondaryReleaseLine"]["thickness"] == "from raster" assert inputSimLines["secondaryReleaseLine"]["initializedFrom"] == "raster" - assert inputSimLines["secondaryReleaseLine"]["type"] == "Secondary release from raster" + assert ( + inputSimLines["secondaryReleaseLine"]["type"] == "Secondary release from raster" + ) assert inputSimLines["releaseLine"]["type"] == "Release" assert inputSimLines["releaseLine"]["initializedFrom"] == "shapefile" # setup requuired input data - inputSimFiles = {"entResInfo": {"flagEnt": "No", "flagRes": "No", "flagSecondaryRelease": "No"}} + inputSimFiles = { + "entResInfo": {"flagEnt": "No", "flagRes": "No", "flagSecondaryRelease": "No"} + } dirName = pathlib.Path(__file__).parents[0] avaDir = dirName / "data" / "avaTestRelTh" relFile = avaDir / "Inputs" / "REL" / "testRel2.asc" @@ -343,7 +368,10 @@ def test_prepareInputData(tmp_path): with pytest.raises(AssertionError) as e: assert com1DFA.prepareInputData(inputSimFiles, cfg) - assert "One or more release features in relAlr2.shp have holes - check error plots in" in str(e.value) + assert ( + "One or more release features in relAlr2.shp have holes - check error plots in" + in str(e.value) + ) def test_prepareReleaseEntrainment(tmp_path): @@ -386,7 +414,9 @@ def test_prepareReleaseEntrainment(tmp_path): rel = pathlib.Path(tmp_path, "release1PF_test.shp") # call function to be tested - relName, inputSimLines, badName = com1DFA.prepareReleaseEntrainment(cfg, rel, inputSimLines) + relName, inputSimLines, badName = com1DFA.prepareReleaseEntrainment( + cfg, rel, inputSimLines + ) assert relName == "release1PF_test" assert inputSimLines["entResInfo"]["flagSecondaryRelease"] == "Yes" @@ -429,7 +459,9 @@ def test_prepareReleaseEntrainment(tmp_path): rel = pathlib.Path(tmp_path, "release1PF_test.shp") # call function to be tested - relName2, inputSimLines2, badName2 = com1DFA.prepareReleaseEntrainment(cfg, rel, inputSimLines) + relName2, inputSimLines2, badName2 = com1DFA.prepareReleaseEntrainment( + cfg, rel, inputSimLines + ) assert relName2 == "release1PF_test" assert inputSimLines2["entResInfo"]["flagSecondaryRelease"] == "Yes" @@ -473,7 +505,9 @@ def test_prepareReleaseEntrainment(tmp_path): rel = pathlib.Path(tmp_path, "release1PF_test.shp") # call function to be tested - relName2, inputSimLines2, badName2 = com1DFA.prepareReleaseEntrainment(cfg, rel, inputSimLines) + relName2, inputSimLines2, badName2 = com1DFA.prepareReleaseEntrainment( + cfg, rel, inputSimLines + ) # print( # "Test", @@ -492,7 +526,9 @@ def test_prepareReleaseEntrainment(tmp_path): # call function to be tested cfg["GENERAL"]["secRelArea"] = "False" - relName3, inputSimLines3, badName3 = com1DFA.prepareReleaseEntrainment(cfg, rel, inputSimLines) + relName3, inputSimLines3, badName3 = com1DFA.prepareReleaseEntrainment( + cfg, rel, inputSimLines + ) assert relName3 == "release1PF_test" assert inputSimLines3["entResInfo"]["flagSecondaryRelease"] == "No" @@ -516,7 +552,9 @@ def test_prepareReleaseEntrainment(tmp_path): cfg["GENERAL"]["relTh"] = "1.32" # call function to test - relName4, inputSimLines4, badName4 = com1DFA.prepareReleaseEntrainment(cfg, rel, inputSimLines) + relName4, inputSimLines4, badName4 = com1DFA.prepareReleaseEntrainment( + cfg, rel, inputSimLines + ) assert relName4 == "release1PF_test" assert inputSimLines4["entResInfo"]["flagSecondaryRelease"] == "No" @@ -559,7 +597,9 @@ def test_prepareReleaseEntrainment(tmp_path): "id": ["0", "1"], "initializedFrom": "shapefile", } - relName5, inputSimLines5, badName5 = com1DFA.prepareReleaseEntrainment(cfg, rel, inputSimLines) + relName5, inputSimLines5, badName5 = com1DFA.prepareReleaseEntrainment( + cfg, rel, inputSimLines + ) assert relName5 == "release1PF_test" assert inputSimLines5["entResInfo"]["flagSecondaryRelease"] == "No" @@ -600,7 +640,9 @@ def test_setThickness(): assert lineTh["thickness"] == [1.0, 1.0] assert lineTh["thicknessSource"] == ["ini file", "ini file"] - assert np.array_equal(lineTh["x"], np.asarray([0, 10.0, 10.0, 0.0, 0.0, 20.0, 26.0, 26.0, 20.0, 20.0])) + assert np.array_equal( + lineTh["x"], np.asarray([0, 10.0, 10.0, 0.0, 0.0, 20.0, 26.0, 26.0, 20.0, 20.0]) + ) # call function to be tested lineTh = { @@ -617,7 +659,9 @@ def test_setThickness(): assert lineTh["thickness"] == [1.0, 1.0] assert lineTh["thicknessSource"] == ["ini file", "ini file"] - assert np.array_equal(lineTh["x"], np.asarray([0, 10.0, 10.0, 0.0, 0.0, 20.0, 26.0, 26.0, 20.0, 20.0])) + assert np.array_equal( + lineTh["x"], np.asarray([0, 10.0, 10.0, 0.0, 0.0, 20.0, 26.0, 26.0, 20.0, 20.0]) + ) # call function to be tested cfg["GENERAL"]["entThFromFile"] = "True" @@ -641,7 +685,9 @@ def test_setThickness(): assert lineTh["thickness"] == [1.0, 0.7] assert lineTh["thicknessSource"] == ["shp file", "shp file"] - assert np.array_equal(lineTh["x"], np.asarray([0, 10.0, 10.0, 0.0, 0.0, 20.0, 26.0, 26.0, 20.0, 20.0])) + assert np.array_equal( + lineTh["x"], np.asarray([0, 10.0, 10.0, 0.0, 0.0, 20.0, 26.0, 26.0, 20.0, 20.0]) + ) # call function to be tested cfg["GENERAL"]["entThFromFile"] = "True" @@ -661,7 +707,9 @@ def test_setThickness(): assert lineTh["thickness"] == [1.2, 0.7] assert lineTh["thicknessSource"] == ["shp file", "shp file"] - assert np.array_equal(lineTh["x"], np.asarray([0, 10.0, 10.0, 0.0, 0.0, 20.0, 26.0, 26.0, 20.0, 20.0])) + assert np.array_equal( + lineTh["x"], np.asarray([0, 10.0, 10.0, 0.0, 0.0, 20.0, 26.0, 26.0, 20.0, 20.0]) + ) def test_createReportDict(): @@ -698,7 +746,9 @@ def test_createReportDict(): } # call function to be tested - reportST = com1DFA.createReportDict(avaDir, logName, relName, inputSimLines, cfg, reportAreaInfo) + reportST = com1DFA.createReportDict( + avaDir, logName, relName, inputSimLines, cfg, reportAreaInfo + ) assert "Simulation Parameters" in reportST assert "Program version" in reportST["Simulation Parameters"] @@ -1071,12 +1121,28 @@ def test_initializeMesh(): assert np.all(np.isnan(dem["rasterData"][0:5, 4])) assert abs(dem["Nx"][2, 2]) == abs(dem["Nz"][2, 2]) assert np.isclose(dem["areaRaster"][2, 2], demTest["areaRaster"][2, 2]) - assert dem["headerNeighbourGrid"]["xllcenter"] == demTest["headerNeighbourGrid"]["xllcenter"] - assert dem["headerNeighbourGrid"]["yllcenter"] == demTest["headerNeighbourGrid"]["yllcenter"] - assert dem["headerNeighbourGrid"]["ncols"] == demTest["headerNeighbourGrid"]["ncols"] - assert dem["headerNeighbourGrid"]["nrows"] == demTest["headerNeighbourGrid"]["nrows"] - assert dem["headerNeighbourGrid"]["cellsize"] == demTest["headerNeighbourGrid"]["cellsize"] - assert dem["headerNeighbourGrid"]["yllcenter"] == demTest["headerNeighbourGrid"]["yllcenter"] + assert ( + dem["headerNeighbourGrid"]["xllcenter"] + == demTest["headerNeighbourGrid"]["xllcenter"] + ) + assert ( + dem["headerNeighbourGrid"]["yllcenter"] + == demTest["headerNeighbourGrid"]["yllcenter"] + ) + assert ( + dem["headerNeighbourGrid"]["ncols"] == demTest["headerNeighbourGrid"]["ncols"] + ) + assert ( + dem["headerNeighbourGrid"]["nrows"] == demTest["headerNeighbourGrid"]["nrows"] + ) + assert ( + dem["headerNeighbourGrid"]["cellsize"] + == demTest["headerNeighbourGrid"]["cellsize"] + ) + assert ( + dem["headerNeighbourGrid"]["yllcenter"] + == demTest["headerNeighbourGrid"]["yllcenter"] + ) def test_getSimTypeList(): @@ -1086,10 +1152,14 @@ def test_getSimTypeList(): standardCfg = configparser.ConfigParser() standardCfg["GENERAL"] = {"secRelArea": "False"} simTypeList = ["ent", "res", "null", "available", "entres"] - inputSimFiles = {"entResInfo": {"flagEnt": "Yes", "flagRes": "Yes", "flagSecondaryRelease": "No"}} + inputSimFiles = { + "entResInfo": {"flagEnt": "Yes", "flagRes": "Yes", "flagSecondaryRelease": "No"} + } # call function to be tested - standardCfg, simTypeList = com1DFA.getSimTypeList(standardCfg, simTypeList, inputSimFiles) + standardCfg, simTypeList = com1DFA.getSimTypeList( + standardCfg, simTypeList, inputSimFiles + ) # setup test result simTypeListTest = ["ent", "null", "res", "entres"] @@ -1100,7 +1170,9 @@ def test_getSimTypeList(): # call function to be tested simTypeList = ["ent", "null", "available"] inputSimFiles["entResInfo"]["flagRes"] = "No" - standardCfg2, simTypeList2 = com1DFA.getSimTypeList(standardCfg, simTypeList, inputSimFiles) + standardCfg2, simTypeList2 = com1DFA.getSimTypeList( + standardCfg, simTypeList, inputSimFiles + ) # setup test result simTypeListTest2 = ["ent", "null"] @@ -1114,7 +1186,9 @@ def test_getSimTypeList(): simTypeList = ["res", "null", "available"] inputSimFiles["entResInfo"]["flagEnt"] = "No" inputSimFiles["entResInfo"]["flagRes"] = "Yes" - standardCfg3, simTypeList3 = com1DFA.getSimTypeList(standardCfg, simTypeList, inputSimFiles) + standardCfg3, simTypeList3 = com1DFA.getSimTypeList( + standardCfg, simTypeList, inputSimFiles + ) # setup test result simTypeListTest3 = ["res", "null"] @@ -1336,11 +1410,19 @@ def test_releaseSecRelArea(): # print("particles IN pytest socond", particles2) assert particles["nPart"] == 6 - assert np.array_equal(particles["x"], np.asarray([6.0, 7.0, 6.75, 7.25, 6.75, 7.25])) - assert np.array_equal(particles["totalEnthalpy"], np.asarray([6.0, 7.0, pEnt, pEnt, pEnt, pEnt])) - assert np.array_equal(particles["y"], np.asarray([6.0, 7.0, 6.75, 6.75, 7.25, 7.25])) + assert np.array_equal( + particles["x"], np.asarray([6.0, 7.0, 6.75, 7.25, 6.75, 7.25]) + ) + assert np.array_equal( + particles["totalEnthalpy"], np.asarray([6.0, 7.0, pEnt, pEnt, pEnt, pEnt]) + ) + assert np.array_equal( + particles["y"], np.asarray([6.0, 7.0, 6.75, 6.75, 7.25, 7.25]) + ) assert np.array_equal(zPartArray0New, np.asarray([2, 3, 1.0, 1.0, 1.0, 1.0])) - assert np.array_equal(particles["m"], np.asarray([1250.0, 1250.0, 50.0, 50.0, 50.0, 50.0])) + assert np.array_equal( + particles["m"], np.asarray([1250.0, 1250.0, 50.0, 50.0, 50.0, 50.0]) + ) assert particles["mTot"] == 2700.0 assert particles2["nPart"] == 11 assert np.array_equal( @@ -1351,10 +1433,14 @@ def test_releaseSecRelArea(): particles2["y"], np.asarray([6.0, 7.0, 9.1, 6.75, 6.75, 7.25, 7.25, 8.75, 8.75, 9.25, 9.25]), ) - assert np.array_equal(zPartArray0New2, np.asarray([1, 2, 3, 1, 1, 1, 1, 1, 1, 1, 1])) + assert np.array_equal( + zPartArray0New2, np.asarray([1, 2, 3, 1, 1, 1, 1, 1, 1, 1, 1]) + ) assert np.array_equal( particles2["m"], - np.asarray([1250.0, 1250.0, 1250.0, 50.0, 50.0, 50.0, 50.0, 25.0, 25.0, 25.0, 25.0]), + np.asarray( + [1250.0, 1250.0, 1250.0, 50.0, 50.0, 50.0, 50.0, 25.0, 25.0, 25.0, 25.0] + ), ) assert particles2["mTot"] == 4050.0 @@ -1365,18 +1451,28 @@ def test_getRelThFromPart(): # setup required input cfg = configparser.ConfigParser() cfg["GENERAL"] = {"relThFromFile": "True", "relTh": ""} - inputSimLines = {"releaseLine": {"thickness": ["1.2", "1.5"], "id": ["0", "1"], "type": "Release"}} + inputSimLines = { + "releaseLine": { + "thickness": ["1.2", "1.5"], + "id": ["0", "1"], + "type": "Release", + } + } relThField = "" # call function to be tested - relThFromPart = com1DFA.getRelThFromPart(cfg["GENERAL"], inputSimLines["releaseLine"], relThField, "rel") + relThFromPart = com1DFA.getRelThFromPart( + cfg["GENERAL"], inputSimLines["releaseLine"], relThField, "rel" + ) assert relThFromPart == 1.5 cfg["GENERAL"]["relThFromFile"] = "False" cfg["GENERAL"]["relTh"] = "2.0" # call function to be tested - relThFromPart = com1DFA.getRelThFromPart(cfg["GENERAL"], inputSimLines["releaseLine"], relThField, "rel") + relThFromPart = com1DFA.getRelThFromPart( + cfg["GENERAL"], inputSimLines["releaseLine"], relThField, "rel" + ) assert relThFromPart == 2.0 @@ -1385,7 +1481,9 @@ def test_getRelThFromPart(): relThField = np.zeros((10, 10)) relThField[0:10, 1] = 10.0 # call function to be tested - relThFromPart = com1DFA.getRelThFromPart(cfg["GENERAL"], inputSimLines["releaseLine"], relThField, "rel") + relThFromPart = com1DFA.getRelThFromPart( + cfg["GENERAL"], inputSimLines["releaseLine"], relThField, "rel" + ) assert relThFromPart == 10.0 @@ -1812,7 +1910,9 @@ def test_exportFields(tmp_path): fieldsList = [fields1, fields2, fields3, fields4, fields5] # call function to be tested - com1DFA.exportFields(cfg, 10.00, fields2, dem, outDir, logName, TSave="intermediate") + com1DFA.exportFields( + cfg, 10.00, fields2, dem, outDir, logName, TSave="intermediate" + ) com1DFA.exportFields(cfg, 40.00, fields5, dem, outDir, logName, TSave="final") # read fields @@ -1848,9 +1948,15 @@ def test_exportFields(tmp_path): cfg["REPORT"] = {} com1DFA.exportFields(cfg, 0.00, fields1, dem, outDir2, logName, TSave="initial") - com1DFA.exportFields(cfg, 10.00, fields2, dem, outDir2, logName, TSave="intermediate") - com1DFA.exportFields(cfg, 15.00, fields3, dem, outDir2, logName, TSave="intermediate") - com1DFA.exportFields(cfg, 25.00, fields4, dem, outDir2, logName, TSave="intermediate") + com1DFA.exportFields( + cfg, 10.00, fields2, dem, outDir2, logName, TSave="intermediate" + ) + com1DFA.exportFields( + cfg, 15.00, fields3, dem, outDir2, logName, TSave="intermediate" + ) + com1DFA.exportFields( + cfg, 25.00, fields4, dem, outDir2, logName, TSave="intermediate" + ) com1DFA.exportFields(cfg, 40.00, fields5, dem, outDir2, logName, TSave="final") # read fields @@ -2085,7 +2191,7 @@ def test_prepareVarSimDict(tmp_path, caplog): testCfg["GENERAL"]["avalancheDir"] = str(avaDir) simHash = cfgUtils.cfgHash(testCfg) - simName1 = "relAlr_" + simHash + "_C_L_entres_dfa" + simName1 = "relAlr_" + simHash + "_com1_C_L_entres_dfa" testDict = { simName1: { "simHash": simHash, @@ -2193,7 +2299,7 @@ def test_prepareVarSimDict(tmp_path, caplog): testCfg2["INPUT"]["resistanceScenario"] = str(pathlib.Path("RES", "entAlr.shp")) testCfg2["GENERAL"]["avalancheDir"] = str(avaDir) simHash2 = cfgUtils.cfgHash(testCfg2) - simName2 = "relAlr_" + simHash2 + "_C_L_entres_dfa" + simName2 = "relAlr_" + simHash2 + "_com1_C_L_entres_dfa" testDict2 = { simName2: { "simHash": simHash2, @@ -2519,7 +2625,9 @@ def test_runCom1DFA(tmp_path, caplog): "reportOneFile": "True", "debugPlot": "False", } - modCfg, modInfo = cfgUtils.getModuleConfig(com1DFA, fileOverride=cfgFile, modInfo=True) + modCfg, modInfo = cfgUtils.getModuleConfig( + com1DFA, fileOverride=cfgFile, modInfo=True + ) dem, plotDict, reportDictList, simDF = com1DFA.com1DFAMain(cfgMain, cfgInfo=cfgFile) @@ -2609,16 +2717,26 @@ def test_runCom1DFA(tmp_path, caplog): # print(simDF["simName"]) outDir = avaDir / "Outputs" / "com1DFA" for ext in ["ppr", "pft", "pfv"]: - assert (outDir / "peakFiles" / ("%s_%s.asc" % (simDF["simName"].iloc[0], ext))).is_file() - assert (outDir / "peakFiles" / ("%s_%s.asc" % (simDF["simName"].iloc[1], ext))).is_file() - - assert (outDir / "configurationFiles" / ("%s.ini" % (simDF["simName"].iloc[0]))).is_file() - assert (outDir / "configurationFiles" / ("%s.ini" % (simDF["simName"].iloc[1]))).is_file() + assert ( + outDir / "peakFiles" / ("%s_%s.asc" % (simDF["simName"].iloc[0], ext)) + ).is_file() + assert ( + outDir / "peakFiles" / ("%s_%s.asc" % (simDF["simName"].iloc[1], ext)) + ).is_file() + + assert ( + outDir / "configurationFiles" / ("%s.ini" % (simDF["simName"].iloc[0])) + ).is_file() + assert ( + outDir / "configurationFiles" / ("%s.ini" % (simDF["simName"].iloc[1])) + ).is_file() assert (outDir / "configurationFiles" / ("allConfigurations.csv")).is_file() initProj.cleanModuleFiles(avaDir, com1DFA, deleteOutput=False) with caplog.at_level(logging.WARNING): - dem, plotDict, reportDictList, simDF = com1DFA.com1DFAMain(cfgMain, cfgInfo=cfgFile) + dem, plotDict, reportDictList, simDF = com1DFA.com1DFAMain( + cfgMain, cfgInfo=cfgFile + ) assert "There is no simulation to be performed" in caplog.text @@ -2635,7 +2753,9 @@ def test_runOrLoadCom1DFA(tmp_path, caplog): testDir = pathlib.Path(__file__).parents[0] avalancheDir = testDir / ".." / ".." / "benchmarks" / "avaHockeyChannelPytest" cfgMain = configparser.ConfigParser() - dem, simDF, resTypeList = com1DFA.runOrLoadCom1DFA(avalancheDir, cfgMain, runDFAModule=False, cfgFile="") + dem, simDF, resTypeList = com1DFA.runOrLoadCom1DFA( + avalancheDir, cfgMain, runDFAModule=False, cfgFile="" + ) # print(simDF.index) # print(simDF.columns) assert "pft" in resTypeList @@ -2674,7 +2794,9 @@ def test_fetchRelVolume(tmp_path): dem["rasterData"] = np.ones((10, 20)) demPath = pathlib.Path(avaDir, "Inputs", "testDem.asc") fU.makeADir(pathlib.Path(avaDir, "Inputs")) - IOf.writeResultToRaster(dem["header"], dem["rasterData"], demPath.parent / demPath.stem, flip=False) + IOf.writeResultToRaster( + dem["header"], dem["rasterData"], demPath.parent / demPath.stem, flip=False + ) # subprocess.run(["cat", demPath]) # write relThField @@ -2807,7 +2929,9 @@ def test_adaptDEM(): dem = geoTrans.getNormalMesh(dem, num=cfg["GENERAL"].getfloat("methodMeshNormal")) dem = DFAtls.getAreaMesh(dem, cfg["GENERAL"].getfloat("methodMeshNormal")) - _, _, NzNormed = DFAtls.normalize(dem["Nx"].copy(), dem["Ny"].copy(), dem["Nz"].copy()) + _, _, NzNormed = DFAtls.normalize( + dem["Nx"].copy(), dem["Ny"].copy(), dem["Nz"].copy() + ) demInput = dem.copy() fieldsInput = fields.copy() @@ -2942,17 +3066,21 @@ def test_tSteps_output_behavior(tmp_path, caplog): # Get main configuration cfgMain = cfgUtils.getGeneralConfig() - cfgMain['MAIN']['avalancheDir'] = str(avaDir1) + cfgMain["MAIN"]["avalancheDir"] = str(avaDir1) # Modify config to have empty tSteps and NO parameter variations cfg = cfgUtils.getModuleConfig(com1DFA, cfgFile1) cfg["GENERAL"]["tSteps"] = "" cfg["GENERAL"]["tEnd"] = "10" # Short simulation cfg["GENERAL"]["dt"] = "0.1" # Single value, no variations - cfg["GENERAL"]["simTypeList"] = "null" # Simple simulation, no entrainment/resistance + cfg["GENERAL"]["simTypeList"] = ( + "null" # Simple simulation, no entrainment/resistance + ) with open(cfgFile1, "w") as f: cfg.write(f) - dem, plotDict, reportDictList, simDF = com1DFA.com1DFAMain(cfgMain, cfgInfo=cfgFile1) + dem, plotDict, reportDictList, simDF = com1DFA.com1DFAMain( + cfgMain, cfgInfo=cfgFile1 + ) # Check that only final timestep files exist in timeSteps directory timeStepsDir1 = avaDir1 / "Outputs" / "com1DFA" / "peakFiles" / "timeSteps" @@ -2961,38 +3089,46 @@ def test_tSteps_output_behavior(tmp_path, caplog): # Should only have final timestep files (one per result type: ppr, pft, pfv) # Not initial timestep at t=0 for tFile in tStepFiles1: - assert "_t0.0" not in tFile.stem, f"Found initial timestep file {tFile} but tSteps was empty" + assert "_t0.0" not in tFile.stem, ( + f"Found initial timestep file {tFile} but tSteps was empty" + ) # Test 2: Explicit tSteps with t=0 should export t=0 timestep avaDir2 = pathlib.Path(tmp_path, "testExplicitTSteps") shutil.copytree(inputDir, avaDir2) cfgFile2 = avaDir2 / "test_com1DFACfg.ini" - cfgMain['MAIN']['avalancheDir'] = str(avaDir2) + cfgMain["MAIN"]["avalancheDir"] = str(avaDir2) # Modify config to have explicit tSteps including t=0 and NO parameter variations cfg2 = cfgUtils.getModuleConfig(com1DFA, cfgFile2) cfg2["GENERAL"]["tSteps"] = "0|5" cfg2["GENERAL"]["tEnd"] = "10" # Short simulation cfg2["GENERAL"]["dt"] = "0.1" # Single value, no variations - cfg2["GENERAL"]["simTypeList"] = "null" # Simple simulation, no entrainment/resistance + cfg2["GENERAL"]["simTypeList"] = ( + "null" # Simple simulation, no entrainment/resistance + ) with open(cfgFile2, "w") as f: cfg2.write(f) - dem2, plotDict2, reportDictList2, simDF2 = com1DFA.com1DFAMain(cfgMain, cfgInfo=cfgFile2) + dem2, plotDict2, reportDictList2, simDF2 = com1DFA.com1DFAMain( + cfgMain, cfgInfo=cfgFile2 + ) # Check that t=0 timestep files exist timeStepsDir2 = avaDir2 / "Outputs" / "com1DFA" / "peakFiles" / "timeSteps" assert timeStepsDir2.exists(), "timeSteps directory should exist" tStepFiles2 = list(timeStepsDir2.glob("*_t0.0*.asc")) - assert len(tStepFiles2) > 0, "Should have initial timestep files at t=0 when tSteps includes 0" + assert len(tStepFiles2) > 0, ( + "Should have initial timestep files at t=0 when tSteps includes 0" + ) # Test 3: exportData = False should trigger contour fetching in else block avaDir3 = pathlib.Path(tmp_path, "testExportDataFalse") shutil.copytree(inputDir, avaDir3) cfgFile3 = avaDir3 / "test_com1DFACfg.ini" - cfgMain['MAIN']['avalancheDir'] = str(avaDir3) + cfgMain["MAIN"]["avalancheDir"] = str(avaDir3) # Modify config to have exportData = False cfg3 = cfgUtils.getModuleConfig(com1DFA, cfgFile3) @@ -3004,13 +3140,163 @@ def test_tSteps_output_behavior(tmp_path, caplog): with open(cfgFile3, "w") as f: cfg3.write(f) - dem3, plotDict3, reportDictList3, simDF3 = com1DFA.com1DFAMain(cfgMain, cfgInfo=cfgFile3) + dem3, plotDict3, reportDictList3, simDF3 = com1DFA.com1DFAMain( + cfgMain, cfgInfo=cfgFile3 + ) # Check that contour data was generated (stored in reportDict) instead of exported files - assert len(reportDictList3) > 0, "Should have report dict even with exportData=False" + assert len(reportDictList3) > 0, ( + "Should have report dict even with exportData=False" + ) # Verify that timeSteps directory doesn't exist (no data exported) timeStepsDir3 = avaDir3 / "Outputs" / "com1DFA" / "peakFiles" / "timeSteps" if timeStepsDir3.exists(): tStepFiles3 = list(timeStepsDir3.glob("*.asc")) # With exportData=False, intermediate timesteps should not be exported - assert len(tStepFiles3) == 0, "No timestep files should be exported when exportData=False" + assert len(tStepFiles3) == 0, ( + "No timestep files should be exported when exportData=False" + ) + + +def test_getModuleNames(): + """Test getModuleNames function for extracting module names from call stack""" + from unittest.mock import patch, MagicMock + + # Test 1: Direct call from com1DFA module + with patch("inspect.stack") as mock_stack: + mock_stack.return_value = [ + MagicMock( + frame=MagicMock(f_globals={"__name__": "avaframe.com1DFA.com1DFA"}) + ), + MagicMock(frame=MagicMock(f_globals={"__name__": "avaframe.com1DFA"})), + ] + result = com1DFA.getModuleNames(com1DFA) + assert result == ("com1DFA", "com1"), ( + f"Expected ('com1DFA', 'com1'), got {result}" + ) + + # Test 2: Call from wrapper module com5SnowSlide + with patch("inspect.stack") as mock_stack: + mock_stack.return_value = [ + MagicMock( + frame=MagicMock( + f_globals={"__name__": "avaframe.com5SnowSlide.com5SnowSlide"} + ) + ), + MagicMock( + frame=MagicMock(f_globals={"__name__": "avaframe.com1DFA.com1DFA"}) + ), + ] + result = com1DFA.getModuleNames(com1DFA) + assert result == ("com5SnowSlide", "com5"), ( + f"Expected ('com5SnowSlide', 'com5'), got {result}" + ) + + # Test 3: Call from wrapper module com6RockAvalanche + with patch("inspect.stack") as mock_stack: + mock_stack.return_value = [ + MagicMock( + frame=MagicMock( + f_globals={ + "__name__": "avaframe.com6RockAvalanche.com6RockAvalanche" + } + ) + ), + MagicMock( + frame=MagicMock(f_globals={"__name__": "avaframe.com1DFA.com1DFA"}) + ), + ] + result = com1DFA.getModuleNames(com1DFA) + assert result == ("com6RockAvalanche", "com6"), ( + f"Expected ('com6RockAvalanche', 'com6'), got {result}" + ) + + # Test 4: Call from wrapper module com8MoTPSA + with patch("inspect.stack") as mock_stack: + mock_stack.return_value = [ + MagicMock( + frame=MagicMock( + f_globals={"__name__": "avaframe.com8MoTPSA.com8MoTPSA"} + ) + ), + MagicMock( + frame=MagicMock(f_globals={"__name__": "avaframe.com1DFA.com1DFA"}) + ), + ] + result = com1DFA.getModuleNames(com1DFA) + assert result == ("com8MoTPSA", "com8"), ( + f"Expected ('com8MoTPSA', 'com8'), got {result}" + ) + + # Test 5: Call from wrapper module com9MoTVoellmy + with patch("inspect.stack") as mock_stack: + mock_stack.return_value = [ + MagicMock( + frame=MagicMock( + f_globals={"__name__": "avaframe.com9MoTVoellmy.com9MoTVoellmy"} + ) + ), + MagicMock( + frame=MagicMock(f_globals={"__name__": "avaframe.com1DFA.com1DFA"}) + ), + ] + result = com1DFA.getModuleNames(com1DFA) + assert result == ("com9MoTVoellmy", "com9"), ( + f"Expected ('com9MoTVoellmy', 'com9'), got {result}" + ) + + # Test 6: Non-com module (fallback to passed module) + with patch("inspect.stack") as mock_stack: + mock_stack.return_value = [ + MagicMock(frame=MagicMock(f_globals={"__name__": "some.other.module"})), + MagicMock(frame=MagicMock(f_globals={"__name__": "another.module"})), + ] + # Create a mock module object + mock_module = MagicMock() + mock_module.__name__ = "avaframe.someModule" + result = com1DFA.getModuleNames(mock_module) + assert result == ("someModule", "someModule"), ( + f"Expected ('someModule', 'someModule'), got {result}" + ) + + # Test 7: Module without "com" prefix in name (fallback) + with patch("inspect.stack") as mock_stack: + mock_stack.return_value = [ + MagicMock( + frame=MagicMock( + f_globals={"__name__": "avaframe.otherModule.otherModule"} + ) + ), + ] + # Create a mock module object + mock_module = MagicMock() + mock_module.__name__ = "avaframe.otherModule" + result = com1DFA.getModuleNames(mock_module) + assert result == ("otherModule", "otherModule"), ( + f"Expected ('otherModule', 'otherModule'), got {result}" + ) + + # Test 8: Deep call stack with multiple com modules (should pick first non-com1DFA.com1DFA) + with patch("inspect.stack") as mock_stack: + mock_stack.return_value = [ + MagicMock( + frame=MagicMock(f_globals={"__name__": "avaframe.com1DFA.com1DFA"}) + ), # Should be skipped + MagicMock( + frame=MagicMock( + f_globals={"__name__": "avaframe.com5SnowSlide.com5SnowSlide"} + ) + ), # Should be picked + MagicMock( + frame=MagicMock( + f_globals={ + "__name__": "avaframe.com6RockAvalanche.com6RockAvalanche" + } + ) + ), # Should be ignored + MagicMock(frame=MagicMock(f_globals={"__name__": "avaframe.com1DFA"})), + ] + result = com1DFA.getModuleNames(com1DFA) + assert result == ("com5SnowSlide", "com5"), ( + f"Expected ('com5SnowSlide', 'com5'), got {result}" + ) diff --git a/avaframe/tests/test_scarp.py b/avaframe/tests/test_scarp.py index fbc40c61c..cb1b89071 100644 --- a/avaframe/tests/test_scarp.py +++ b/avaframe/tests/test_scarp.py @@ -90,7 +90,9 @@ def temp_output_dir(tmp_path): def test_readPerimeterSHP(scarp_test_data): """Test perimeter shapefile reading and rasterization""" # Get paths to test data - perimeterShp = scarp_test_data / "Inputs" / "POLYGONS" / "scarpFluchthorn_perimeter.shp" + perimeterShp = ( + scarp_test_data / "Inputs" / "POLYGONS" / "scarpFluchthorn_perimeter.shp" + ) demPath = scarp_test_data / "Inputs" / "fluchthorn.tif" # Read DEM to get transform and shape @@ -104,7 +106,9 @@ def test_readPerimeterSHP(scarp_test_data): # Assertions assert periData.shape == elevShape, "Perimeter shape should match DEM shape" assert periData.dtype == np.uint8, "Perimeter should be uint8 type" - assert np.all((periData == 0) | (periData == 1)), "Perimeter should only contain 0 and 1" + assert np.all((periData == 0) | (periData == 1)), ( + "Perimeter should only contain 0 and 1" + ) assert np.sum(periData) > 0, "Perimeter should contain some pixels marked as 1" assert np.sum(periData) < periData.size, "Perimeter should not mark all pixels" @@ -121,9 +125,13 @@ def test_plane_parameter_extraction(scarp_test_data): planesSlope = list(map(float, SHPdata["slopeangle"])) # Assertions - assert len(planesZseed) == SHPdata["nFeatures"], "Should have zseed for each feature" + assert len(planesZseed) == SHPdata["nFeatures"], ( + "Should have zseed for each feature" + ) assert len(planesDip) == SHPdata["nFeatures"], "Should have dip for each feature" - assert len(planesSlope) == SHPdata["nFeatures"], "Should have slope for each feature" + assert len(planesSlope) == SHPdata["nFeatures"], ( + "Should have slope for each feature" + ) assert SHPdata["nFeatures"] == 2, "Test data should have 2 features" # Build feature string @@ -139,7 +147,9 @@ def test_plane_parameter_extraction(scarp_test_data): features = ",".join(map(str, planeFeatures)) # Should have 5 parameters per feature - assert len(planeFeatures) == SHPdata["nFeatures"] * 5, "Should have 5 params per plane" + assert len(planeFeatures) == SHPdata["nFeatures"] * 5, ( + "Should have 5 params per plane" + ) assert len(features) > 0, "Feature string should not be empty" assert features.count(",") == len(planeFeatures) - 1, "Comma count should match" @@ -163,7 +173,9 @@ def test_plane_geometry_calculations(): west, north = 150.0, 250.0 # Point coordinates # Plane equation: z = zSeed + (north - ySeed) * betaY - (west - xSeed) * betaX - scarpVal = zSeed + (north - ySeed) * expected_betaY - (west - xSeed) * expected_betaX + scarpVal = ( + zSeed + (north - ySeed) * expected_betaY - (west - xSeed) * expected_betaX + ) # Manual calculation expected_scarpVal = 1000.0 + (50.0 * expected_betaY) - (50.0 * expected_betaX) @@ -171,7 +183,9 @@ def test_plane_geometry_calculations(): assert abs(scarpVal - expected_scarpVal) < 0.001, "Plane equation should be correct" -def test_calculateScarpWithPlanes_single_plane(mock_dem, mock_perimeter, mock_transform): +def test_calculateScarpWithPlanes_single_plane( + mock_dem, mock_perimeter, mock_transform +): """Test plane-based scarp calculation with single plane""" # Create a simple plane definition # Place seed point at center of grid with known parameters @@ -182,7 +196,9 @@ def test_calculateScarpWithPlanes_single_plane(mock_dem, mock_perimeter, mock_tr planes = f"{xSeed},{ySeed},{zSeed},{dip},{slope}" # Call function under test - scarpData = scarp.calculateScarpWithPlanes(mock_dem, mock_perimeter, mock_transform, planes) + scarpData = scarp.calculateScarpWithPlanes( + mock_dem, mock_perimeter, mock_transform, planes + ) # Assertions assert scarpData.shape == mock_dem.shape, "Scarp should have same shape as DEM" @@ -190,18 +206,20 @@ def test_calculateScarpWithPlanes_single_plane(mock_dem, mock_perimeter, mock_tr # Outside perimeter, scarp should equal DEM outside_mask = mock_perimeter == 0 - assert np.allclose( - scarpData[outside_mask], mock_dem[outside_mask] - ), "Outside perimeter, scarp should equal DEM" + assert np.allclose(scarpData[outside_mask], mock_dem[outside_mask]), ( + "Outside perimeter, scarp should equal DEM" + ) # Inside perimeter, scarp should be <= DEM inside_mask = mock_perimeter > 0 - assert np.all( - scarpData[inside_mask] <= mock_dem[inside_mask] + 0.001 - ), "Inside perimeter, scarp should not exceed DEM" + assert np.all(scarpData[inside_mask] <= mock_dem[inside_mask] + 0.001), ( + "Inside perimeter, scarp should not exceed DEM" + ) -def test_calculateScarpWithPlanes_multiple_planes(mock_dem, mock_perimeter, mock_transform): +def test_calculateScarpWithPlanes_multiple_planes( + mock_dem, mock_perimeter, mock_transform +): """Test plane calculation with multiple planes (maximum selection)""" # Create two planes with different seed points # Plane 1 @@ -215,16 +233,18 @@ def test_calculateScarpWithPlanes_multiple_planes(mock_dem, mock_perimeter, mock planes = f"{xSeed1},{ySeed1},{zSeed1},{dip1},{slope1},{xSeed2},{ySeed2},{zSeed2},{dip2},{slope2}" # Call function under test - scarpData = scarp.calculateScarpWithPlanes(mock_dem, mock_perimeter, mock_transform, planes) + scarpData = scarp.calculateScarpWithPlanes( + mock_dem, mock_perimeter, mock_transform, planes + ) # Assertions assert scarpData.shape == mock_dem.shape, "Scarp should have same shape as DEM" # Outside perimeter, scarp should equal DEM outside_mask = mock_perimeter == 0 - assert np.allclose( - scarpData[outside_mask], mock_dem[outside_mask] - ), "Outside perimeter, scarp should equal DEM" + assert np.allclose(scarpData[outside_mask], mock_dem[outside_mask]), ( + "Outside perimeter, scarp should equal DEM" + ) def test_calculateScarpWithPlanes_edge_cases(mock_dem, mock_perimeter, mock_transform): @@ -234,17 +254,23 @@ def test_calculateScarpWithPlanes_edge_cases(mock_dem, mock_perimeter, mock_tran dip, slope = 0.0, 0.0 # Zero slope planes = f"{xSeed},{ySeed},{zSeed},{dip},{slope}" - scarpData = scarp.calculateScarpWithPlanes(mock_dem, mock_perimeter, mock_transform, planes) + scarpData = scarp.calculateScarpWithPlanes( + mock_dem, mock_perimeter, mock_transform, planes + ) # With zero slope, plane should be horizontal at zSeed inside_mask = mock_perimeter > 0 expected = np.minimum(mock_dem[inside_mask], zSeed) - assert np.allclose(scarpData[inside_mask], expected), "Zero slope should create horizontal plane" + assert np.allclose(scarpData[inside_mask], expected), ( + "Zero slope should create horizontal plane" + ) # Test case 2: Vertical dip (90 degrees) dip, slope = 90.0, 10.0 planes = f"{xSeed},{ySeed},{zSeed},{dip},{slope}" - scarpData = scarp.calculateScarpWithPlanes(mock_dem, mock_perimeter, mock_transform, planes) + scarpData = scarp.calculateScarpWithPlanes( + mock_dem, mock_perimeter, mock_transform, planes + ) # Should not crash and produce valid output assert scarpData.shape == mock_dem.shape, "Should handle 90 degree dip" @@ -256,7 +282,9 @@ def test_calculateScarpWithPlanes_edge_cases(mock_dem, mock_perimeter, mock_tran # ============================================================================ -def test_scarpAnalysisMain_plane_method(scarp_test_data, scarp_config, tmp_path, caplog): +def test_scarpAnalysisMain_plane_method( + scarp_test_data, scarp_config, tmp_path, caplog +): """End-to-end test using plane method with real test data""" # Set caplog to capture INFO level logs caplog.set_level("INFO") @@ -285,14 +313,18 @@ def test_scarpAnalysisMain_plane_method(scarp_test_data, scarp_config, tmp_path, assert src.height == 220, "Output should have correct height" assert src.width == 300, "Output should have correct width" - assert np.all(np.isfinite(scarp_data[scarp_data != src.nodata])), "Scarp data should be finite" + assert np.all(np.isfinite(scarp_data[scarp_data != src.nodata])), ( + "Scarp data should be finite" + ) with rasterio.open(hrel_file) as src: hrel_data = src.read(1) # hRel should be non-negative where valid (DEM - scarp >= 0) valid_mask = hrel_data != src.nodata - assert np.all(hrel_data[valid_mask] >= -0.001), "hRel values should be non-negative" + assert np.all(hrel_data[valid_mask] >= -0.001), ( + "hRel values should be non-negative" + ) # Check logging output assert "Perimeterfile is:" in caplog.text, "Should log perimeter file" @@ -304,7 +336,9 @@ def test_scarpAnalysisMain_plane_method(scarp_test_data, scarp_config, tmp_path, # ============================================================================ -def test_scarpAnalysisMain_missing_perimeter_file(scarp_config, temp_output_dir, caplog): +def test_scarpAnalysisMain_missing_perimeter_file( + scarp_config, temp_output_dir, caplog +): """Test error handling when perimeter shapefile is missing""" # Create a minimal test setup with missing perimeter file # Create a dummy DEM @@ -320,12 +354,14 @@ def test_scarpAnalysisMain_missing_perimeter_file(scarp_config, temp_output_dir, scarp.scarpAnalysisMain(scarp_config, str(temp_output_dir)) # Check that error was logged - assert ( - "not found" in caplog.text.lower() or "error" in caplog.text.lower() - ), "Should log error about missing file" + assert "not found" in caplog.text.lower() or "error" in caplog.text.lower(), ( + "Should log error about missing file" + ) -def test_scarpAnalysisMain_invalid_shapefile_attributes(scarp_config, temp_output_dir, caplog): +def test_scarpAnalysisMain_invalid_shapefile_attributes( + scarp_config, temp_output_dir, caplog +): """Test validation of shapefile attributes for plane method""" # This test would require creating a shapefile with missing attributes # For now, we test the error path by checking the ValueError is raised @@ -339,7 +375,9 @@ def test_scarpAnalysisMain_invalid_shapefile_attributes(scarp_config, temp_outpu assert True, "Attribute validation tested through code inspection" -def test_scarpAnalysisMain_useShapefiles_false(scarp_config, scarp_test_data, tmp_path, caplog): +def test_scarpAnalysisMain_useShapefiles_false( + scarp_config, scarp_test_data, tmp_path, caplog +): """Test configuration validation when useShapefiles is False""" # Copy test data to temporary directory to have a valid DEM test_dir = tmp_path / "scarpTestNoShapefiles" @@ -356,7 +394,9 @@ def test_scarpAnalysisMain_useShapefiles_false(scarp_config, scarp_test_data, tm pass # Expected to fail after logging error # Check error message was logged - assert "Shapefile option not selected" in caplog.text, "Should log error about shapefile option" + assert "Shapefile option not selected" in caplog.text, ( + "Should log error about shapefile option" + ) def test_scarpAnalysisMain_invalid_method(scarp_test_data, scarp_config, tmp_path): @@ -390,3 +430,41 @@ def test_scarpAnalysisMain_missing_required_attributes(scarp_test_data, tmp_path # The test data has correct attributes, so we can't test the error path easily # without creating invalid shapefiles. We document this limitation. assert True, "Error path for missing attributes tested through code inspection" + + +def test_error_message_attribute_names(): + """Test that error messages reference correct attribute names""" + # This test verifies that error messages in scarp.py reference + # the same attribute names that are actually used in the code + + import avaframe.com6RockAvalanche.scarp as scarp_module + + # Read the scarp.py file to check error messages + scarp_path = pathlib.Path(scarp_module.__file__) + scarp_content = scarp_path.read_text() + + # Check line 90 error message + line_90_match = None + for i, line in enumerate(scarp_content.split("\\n"), 1): + if i == 90: + line_90_match = line + break + + # The error message should reference 'dipAngle' not 'dipangle' + if line_90_match: + assert "'dipAngle'" in line_90_match or "'dipangle'" in line_90_match, ( + f"Line 90 should reference dipAngle attribute: {line_90_match}" + ) + + # Check line 121 error message + line_121_match = None + for i, line in enumerate(scarp_content.split("\\n"), 1): + if i == 121: + line_121_match = line + break + + # The error message should reference 'rotAngle' not 'rotangle' + if line_121_match: + assert "'rotAngle'" in line_121_match or "'rotangle'" in line_121_match, ( + f"Line 121 should reference rotAngle attribute: {line_121_match}" + ) diff --git a/docs/moduleCom1DFA.rst b/docs/moduleCom1DFA.rst index 594d48db2..7dc6e9f5f 100644 --- a/docs/moduleCom1DFA.rst +++ b/docs/moduleCom1DFA.rst @@ -271,11 +271,13 @@ Using the default configuration, the simulation results are saved to: *Outputs/c to be resumed without re-running simulations that have already been performed. For this, just restart the run. The naming of the output files has the following structure, shown with the example of -*relAlr_ff5f9b78c6_C_L_null_dfa_ppr*: +*relAlr_ff5f9b78c6_com1_C_L_null_dfa_ppr*: * *relAlr* - release area name, usually the name of the shapefile * *ff5f9b78c6* - individual hash of the configuration file used for the simulation. All files related to this simulation have the same hash in their name. This allows to identify which files belong to which simulation. +* *com1* - short module name (com1 for com1DFA, com2 for com2AB, etc.). This component was added in 2025-12 + to support better filtering and organization of simulations from different computational modules. * *C* - indicator of the setup used: D for default setup, C for custom setup, i.e. something was changed in the configuration file * *L* - indicator of the size category used for the friction model: L for large, M for medium, S for small @@ -283,6 +285,9 @@ The naming of the output files has the following structure, shown with the examp * *dfa* - indicator of the simulation type: dfa for dense flow avalanche * *ppr* - indicator of the result type: ppr for peak pressure, pfv for peak flow velocity, pft for peak flow thickness, etc +**Note:** Older simulations may not have the module name component (*com1*). The system automatically detects +and handles both formats for backward compatibility. + Optional outputs diff --git a/pyproject.toml b/pyproject.toml index 0791b0b29..401a71199 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -122,6 +122,11 @@ python = "==3.13.7" #[tool.pixi.feature.rcs.pypi-dependencies] #avaframe = "==1.13rc4" +[tool.pixi.tasks] +build = "python setup.py build_ext --inplace" +clean = "find avaframe -type f \\( -name '*.so' -o -name '*.c' \\) -delete" +rebuild = { depends-on = ["clean", "build"] } + #Feature qgis [tool.pixi.feature.qgis.dependencies] numpy = "<2.0"