diff --git a/dpdata/plugins/fennol.py b/dpdata/plugins/fennol.py new file mode 100644 index 00000000..7299639d --- /dev/null +++ b/dpdata/plugins/fennol.py @@ -0,0 +1,212 @@ +from __future__ import annotations + +import pickle +from typing import Any + +from dpdata.format import Format +from dpdata.unit import EnergyConversion + + +class _MultiSystemCollector: + """Helper class to collect data from multiple systems.""" + + def __init__(self, filename: str, **kwargs): + self.filename = filename + self.kwargs = kwargs + self.systems_data = [] + self.expected_count = 0 + self.processed_count = 0 + + def add_system_data(self, system_name: str, data: dict): + """Add data from a single system.""" + self.systems_data.append((system_name, data)) + self.processed_count += 1 + + # Write combined data when all systems are processed + if self.processed_count == self.expected_count: + self._write_combined_data() + + def _write_combined_data(self): + """Combine data from all systems and write to FeNNol format.""" + if not self.systems_data: + return + + # Unit conversions + energy_conv = EnergyConversion("eV", "kcal_mol").value() + force_conv = EnergyConversion("eV", "kcal_mol").value() + + all_structures = [] + total_frames = 0 + + # Process each system + for system_name, data in self.systems_data: + atom_names = data["atom_names"] + atom_types = data["atom_types"] + coords = data["coords"] + energies = data["energies"] + forces = data["forces"] + + nframes = coords.shape[0] + natoms = coords.shape[1] + total_frames += nframes + + # Create species array from atom_types and atom_names + species = [atom_names[atom_types[i]] for i in range(natoms)] + + # Process each frame + for i in range(nframes): + structure = { + "species": species, + "coordinates": coords[i].copy(), + "formation_energy": energies[i] * energy_conv, + "shifted_energy": energies[i] * energy_conv, + "forces": forces[i] * force_conv, + "system_name": system_name, # Track which system this came from + } + all_structures.append(structure) + + # Split into training and validation sets + train_size = self.kwargs.get('train_size', 0.8) + n_train = int(total_frames * train_size) + training_data = all_structures[:n_train] + validation_data = all_structures[n_train:] + + # Create FeNNol format dictionary + fennol_data = { + "training": training_data, + "validation": validation_data, + "description": f"Generated from dpdata MultiSystems with {len(self.systems_data)} systems, " + f"{total_frames} frames, {n_train} training, {total_frames - n_train} validation" + } + + # Save to pickle file + with open(self.filename, 'wb') as f: + pickle.dump(fennol_data, f) + + +@Format.register("fennol") +class FeNNolFormat(Format): + """The FeNNol format plugin for dpdata. + + FeNNol (https://github.com/thomasple/FeNNol/) uses a pickle format + for training machine learning models. This plugin supports exporting + dpdata LabeledSystem to FeNNol format. + + The format consists of a dictionary with 'training' and 'validation' keys, + where each contains a list of structures with: + - 'species': atomic species/elements + - 'coordinates': atomic positions in Angstroms + - 'formation_energy': energy in kcal/mol + - 'shifted_energy': energy in kcal/mol (same as formation_energy in this implementation) + - 'forces': atomic forces in kcal/mol/Angstrom + + Examples + -------- + Export a LabeledSystem to FeNNol format: + + >>> import dpdata + >>> ls = dpdata.LabeledSystem("OUTCAR", fmt="vasp/outcar") + >>> ls.to("fennol", "data.pkl") + + Export multiple systems to a single FeNNol file: + + >>> ms = dpdata.MultiSystems(ls1, ls2) + >>> ms.to("fennol", "combined_data.pkl") + """ + + def __init__(self): + super().__init__() + self._multi_collector = None + + def to_multi_systems(self, formulas: list[str], directory: str, **kwargs: Any): + """Generate collectors for writing multiple systems to the same FeNNol file. + + Parameters + ---------- + formulas : list[str] + formulas/names of systems + directory : str + FeNNol pickle file name + **kwargs : dict + other parameters (e.g., train_size) + + Yields + ------ + _MultiSystemCollector + collector object that systems will write their data to + """ + # Create shared collector for all systems + self._multi_collector = _MultiSystemCollector(directory, **kwargs) + self._multi_collector.expected_count = len(formulas) + + # Yield the same collector for each system + for formula in formulas: + yield self._multi_collector + + def to_labeled_system(self, data, file_name, train_size=0.8, **kwargs): + """Convert dpdata LabeledSystem to FeNNol format. + + Parameters + ---------- + data : dict + LabeledSystem data + file_name : str or _MultiSystemCollector + Output pickle file name or multi-system collector + train_size : float, optional + Fraction of data to use for training (default: 0.8) + **kwargs : dict + Other parameters + """ + # Check if this is being called from MultiSystems + if isinstance(file_name, _MultiSystemCollector): + # Add data to the collector instead of writing directly + system_name = kwargs.get('system_name', 'unnamed_system') + file_name.add_system_data(system_name, data) + return + + # Original single-system implementation + # Unit conversions + energy_conv = EnergyConversion("eV", "kcal_mol").value() + force_conv = EnergyConversion("eV", "kcal_mol").value() # eV/Angstrom to kcal/mol/Angstrom + + # Extract data + atom_names = data["atom_names"] + atom_types = data["atom_types"] + coords = data["coords"] # shape: (nframes, natoms, 3) + energies = data["energies"] # shape: (nframes,) + forces = data["forces"] # shape: (nframes, natoms, 3) + + nframes = coords.shape[0] + natoms = coords.shape[1] + + # Create species array from atom_types and atom_names + species = [atom_names[atom_types[i]] for i in range(natoms)] + + # Prepare data structures + structures = [] + + for i in range(nframes): + structure = { + "species": species, + "coordinates": coords[i].copy(), # Already in Angstroms + "formation_energy": energies[i] * energy_conv, # Convert eV to kcal/mol + "shifted_energy": energies[i] * energy_conv, # Same as formation_energy + "forces": forces[i] * force_conv, # Convert eV/Angstrom to kcal/mol/Angstrom + } + structures.append(structure) + + # Split into training and validation sets + n_train = int(nframes * train_size) + training_data = structures[:n_train] + validation_data = structures[n_train:] + + # Create FeNNol format dictionary + fennol_data = { + "training": training_data, + "validation": validation_data, + "description": f"Generated from dpdata with {nframes} frames, {n_train} training, {nframes - n_train} validation" + } + + # Save to pickle file + with open(file_name, 'wb') as f: + pickle.dump(fennol_data, f) \ No newline at end of file diff --git a/dpdata/system.py b/dpdata/system.py index 4c8f350a..32b69874 100644 --- a/dpdata/system.py +++ b/dpdata/system.py @@ -23,6 +23,7 @@ # ensure all plugins are loaded! import dpdata.plugins import dpdata.plugins.deepmd +import dpdata.plugins.fennol from dpdata.amber.mask import load_param_file, pick_by_amber_mask from dpdata.data_type import Axis, DataError, DataType, get_data_types from dpdata.driver import Driver, Minimizer @@ -1403,13 +1404,23 @@ def from_fmt_obj( def to_fmt_obj(self, fmtobj: Format, directory, *args: Any, **kwargs: Any): if not isinstance(fmtobj, dpdata.plugins.deepmd.DeePMDMixedFormat): - for fn, ss in zip( - fmtobj.to_multi_systems( - [ss.short_name for ss in self.systems.values()], directory, **kwargs - ), - self.systems.values(), - ): - ss.to_fmt_obj(fmtobj, fn, *args, **kwargs) + # Special case for FeNNol format that needs system names + if isinstance(fmtobj, dpdata.plugins.fennol.FeNNolFormat): + for fn, ss in zip( + fmtobj.to_multi_systems( + [ss.short_name for ss in self.systems.values()], directory, **kwargs + ), + self.systems.values(), + ): + ss.to_fmt_obj(fmtobj, fn, *args, system_name=ss.short_name, **kwargs) + else: + for fn, ss in zip( + fmtobj.to_multi_systems( + [ss.short_name for ss in self.systems.values()], directory, **kwargs + ), + self.systems.values(), + ): + ss.to_fmt_obj(fmtobj, fn, *args, **kwargs) else: mixed_systems = fmtobj.mix_system( *list(self.systems.values()), type_map=self.atom_names, **kwargs diff --git a/tests/test_fennol.py b/tests/test_fennol.py new file mode 100644 index 00000000..f8d3bf9e --- /dev/null +++ b/tests/test_fennol.py @@ -0,0 +1,312 @@ +from __future__ import annotations + +import os +import pickle +import tempfile +import unittest + +import numpy as np +from context import dpdata + + +class TestFeNNolFormat(unittest.TestCase): + def setUp(self): + """Set up test fixtures with a simple water molecule system.""" + # Create a simple test system: water molecule (H2O) + self.test_data = { + "atom_names": ["H", "O"], + "atom_numbs": [2, 1], + "atom_types": np.array([0, 1, 0]), # H, O, H + "coords": np.array( + [ + [[0.0, 0.0, 0.0], [0.0, 0.0, 1.0], [0.0, 1.0, 0.0]], # frame 1 + [[0.1, 0.0, 0.0], [0.0, 0.1, 1.0], [0.0, 1.1, 0.0]], # frame 2 + ] + ), # 2 frames, 3 atoms, 3 coords + "cells": np.array( + [ + [[10.0, 0.0, 0.0], [0.0, 10.0, 0.0], [0.0, 0.0, 10.0]], # frame 1 + [[10.0, 0.0, 0.0], [0.0, 10.0, 0.0], [0.0, 0.0, 10.0]], # frame 2 + ] + ), # 2 frames, 3x3 cell + "energies": np.array([-1.0, -1.1]), # 2 frame energies in eV + "forces": np.array( + [ + [[0.1, 0.0, 0.0], [0.0, 0.1, 0.0], [-0.1, -0.1, 0.0]], # frame 1 + [[0.2, 0.0, 0.0], [0.0, 0.2, 0.0], [-0.2, -0.2, 0.0]], # frame 2 + ] + ), # 2 frames, 3 atoms, 3 force components in eV/Angstrom + "orig": np.array([0.0, 0.0, 0.0]), + "nopbc": False, + } + + self.system = dpdata.LabeledSystem(data=self.test_data) + + def test_fennol_export(self): + """Test basic FeNNol format export functionality.""" + with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as tmp_file: + tmp_filename = tmp_file.name + + try: + # Export to FeNNol format + self.system.to("fennol", tmp_filename) + + # Check that file was created + self.assertTrue(os.path.exists(tmp_filename)) + + # Load and verify the FeNNol data + with open(tmp_filename, "rb") as f: + fennol_data = pickle.load(f) + + # Check main structure + self.assertIn("training", fennol_data) + self.assertIn("validation", fennol_data) + self.assertIn("description", fennol_data) + + # Check that we have training and validation data + training = fennol_data["training"] + validation = fennol_data["validation"] + + # With default train_size=0.8 and 2 frames, we should have 1 training, 1 validation + self.assertEqual(len(training), 1) + self.assertEqual(len(validation), 1) + + # Check structure of training data + sample = training[0] + expected_keys = { + "species", + "coordinates", + "formation_energy", + "shifted_energy", + "forces", + } + self.assertEqual(set(sample.keys()), expected_keys) + + # Check species + expected_species = ["H", "O", "H"] + self.assertEqual(sample["species"], expected_species) + + # Check coordinates (should be unchanged from Angstroms) + np.testing.assert_array_almost_equal( + sample["coordinates"], self.test_data["coords"][0] + ) + + # Check energy conversion (eV to kcal/mol) + # 1 eV ≈ 23.06 kcal/mol + expected_energy = self.test_data["energies"][0] * 23.06054783061903 + self.assertAlmostEqual( + sample["formation_energy"], expected_energy, places=5 + ) + self.assertAlmostEqual(sample["shifted_energy"], expected_energy, places=5) + + # Check forces conversion + expected_forces = self.test_data["forces"][0] * 23.06054783061903 + np.testing.assert_array_almost_equal( + sample["forces"], expected_forces, decimal=5 + ) + + finally: + # Clean up + if os.path.exists(tmp_filename): + os.unlink(tmp_filename) + + def test_fennol_export_custom_train_size(self): + """Test FeNNol export with custom training size.""" + with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as tmp_file: + tmp_filename = tmp_file.name + + try: + # Export with train_size=0.5 (1 training, 1 validation from 2 frames) + self.system.to("fennol", tmp_filename, train_size=0.5) + + with open(tmp_filename, "rb") as f: + fennol_data = pickle.load(f) + + training = fennol_data["training"] + validation = fennol_data["validation"] + + # Should have 1 training, 1 validation with train_size=0.5 + self.assertEqual(len(training), 1) + self.assertEqual(len(validation), 1) + + finally: + if os.path.exists(tmp_filename): + os.unlink(tmp_filename) + + def test_fennol_export_all_training(self): + """Test FeNNol export with all data as training.""" + with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as tmp_file: + tmp_filename = tmp_file.name + + try: + # Export with train_size=1.0 (all training, no validation) + self.system.to("fennol", tmp_filename, train_size=1.0) + + with open(tmp_filename, "rb") as f: + fennol_data = pickle.load(f) + + training = fennol_data["training"] + validation = fennol_data["validation"] + + # Should have 2 training, 0 validation + self.assertEqual(len(training), 2) + self.assertEqual(len(validation), 0) + + finally: + if os.path.exists(tmp_filename): + os.unlink(tmp_filename) + + def test_fennol_single_frame(self): + """Test FeNNol export with single frame.""" + # Create single frame system + single_frame_data = { + k: v[:1] + if k in ["coords", "cells", "energies"] + else (v[:1] if k == "forces" else v) + for k, v in self.test_data.items() + } + single_system = dpdata.LabeledSystem(data=single_frame_data) + + with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as tmp_file: + tmp_filename = tmp_file.name + + try: + single_system.to("fennol", tmp_filename) + + with open(tmp_filename, "rb") as f: + fennol_data = pickle.load(f) + + training = fennol_data["training"] + validation = fennol_data["validation"] + + # With 1 frame and train_size=0.8, should have 0 training, 1 validation + # (since int(1 * 0.8) = 0) + self.assertEqual(len(training), 0) + self.assertEqual(len(validation), 1) + + finally: + if os.path.exists(tmp_filename): + os.unlink(tmp_filename) + + def test_fennol_multi_systems_export(self): + """Test FeNNol format export with MultiSystems.""" + # Create a second test system: CO molecule + test_data2 = { + "atom_names": ["C", "O"], + "atom_numbs": [1, 1], + "atom_types": np.array([0, 1]), # C, O + "coords": np.array( + [[[0.0, 0.0, 0.0], [1.2, 0.0, 0.0]]] # 1 frame, 2 atoms, 3 coords + ), + "cells": np.array( + [[[10.0, 0.0, 0.0], [0.0, 10.0, 0.0], [0.0, 0.0, 10.0]]] # 1 frame + ), + "energies": np.array([-2.5]), # 1 frame energy in eV + "forces": np.array( + [[[0.05, 0.0, 0.0], [-0.05, 0.0, 0.0]]] # 1 frame, 2 atoms, 3 force components + ), + "orig": np.array([0.0, 0.0, 0.0]), + "nopbc": False, + } + + system2 = dpdata.LabeledSystem(data=test_data2) + + # Create MultiSystems with both systems + multi_systems = dpdata.MultiSystems(self.system, system2) + + with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as tmp_file: + tmp_filename = tmp_file.name + + try: + # Export MultiSystems to FeNNol format + multi_systems.to("fennol", tmp_filename) + + # Check that file was created + self.assertTrue(os.path.exists(tmp_filename)) + + # Load and verify the FeNNol data + with open(tmp_filename, "rb") as f: + fennol_data = pickle.load(f) + + # Check main structure + self.assertIn("training", fennol_data) + self.assertIn("validation", fennol_data) + self.assertIn("description", fennol_data) + + # Check that we have combined data from both systems + training = fennol_data["training"] + validation = fennol_data["validation"] + total_frames = len(training) + len(validation) + + # Should have 3 total frames (2 from first system + 1 from second system) + self.assertEqual(total_frames, 3) + + # Check that system names are tracked + all_samples = training + validation + system_names = set() + for sample in all_samples: + if "system_name" in sample: + system_names.add(sample["system_name"]) + + # Should have data from both systems + self.assertEqual(len(system_names), 2) + + # Verify sample structure includes system_name + if all_samples: + sample = all_samples[0] + expected_keys = { + "species", + "coordinates", + "formation_energy", + "shifted_energy", + "forces", + "system_name", + } + self.assertEqual(set(sample.keys()), expected_keys) + + finally: + if os.path.exists(tmp_filename): + os.unlink(tmp_filename) + + def test_fennol_multi_systems_custom_train_size(self): + """Test FeNNol MultiSystems export with custom training size.""" + # Create a simple second system + test_data2 = { + "atom_names": ["C"], + "atom_numbs": [1], + "atom_types": np.array([0]), # C + "coords": np.array([[[0.0, 0.0, 0.0]]]), # 1 frame, 1 atom + "cells": np.array([[[5.0, 0.0, 0.0], [0.0, 5.0, 0.0], [0.0, 0.0, 5.0]]]), + "energies": np.array([-0.5]), # 1 frame + "forces": np.array([[[0.0, 0.0, 0.0]]]), # 1 frame, 1 atom + "orig": np.array([0.0, 0.0, 0.0]), + "nopbc": False, + } + + system2 = dpdata.LabeledSystem(data=test_data2) + multi_systems = dpdata.MultiSystems(self.system, system2) + + with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as tmp_file: + tmp_filename = tmp_file.name + + try: + # Export with train_size=1.0 (all training) + multi_systems.to("fennol", tmp_filename, train_size=1.0) + + with open(tmp_filename, "rb") as f: + fennol_data = pickle.load(f) + + training = fennol_data["training"] + validation = fennol_data["validation"] + + # Should have all 3 frames as training, 0 validation + self.assertEqual(len(training), 3) + self.assertEqual(len(validation), 0) + + finally: + if os.path.exists(tmp_filename): + os.unlink(tmp_filename) + + +if __name__ == "__main__": + unittest.main()