-
Notifications
You must be signed in to change notification settings - Fork 11k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Embed files #8121
base: master
Are you sure you want to change the base?
Embed files #8121
Changes from 6 commits
b096272
e456204
ae6a22b
f1348e2
562d4f2
096c3da
2bad597
48856e1
15c309c
a4289e2
01b463f
287f7db
9c972aa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
#!/usr/bin/env python3 | ||
# gguf-addfile.py srcfile dstfile addfiles ... | ||
|
||
from __future__ import annotations | ||
|
||
import logging | ||
import argparse | ||
import os | ||
import sys | ||
from pathlib import Path | ||
from typing import Any | ||
|
||
import numpy as np | ||
import numpy.typing as npt | ||
|
||
# Necessary to load the local gguf package | ||
if "NO_LOCAL_GGUF" not in os.environ and (Path(__file__).parent.parent.parent / 'gguf-py').exists(): | ||
sys.path.insert(0, str(Path(__file__).parent.parent)) | ||
|
||
from gguf import GGUFReader, GGUFWriter, ReaderField, GGMLQuantizationType, GGUFEndian, GGUFValueType, Keys # noqa: E402 | ||
|
||
logger = logging.getLogger("gguf-addfile") | ||
|
||
|
||
def get_file_host_endian(reader: GGUFReader) -> tuple[str, str]: | ||
host_endian = 'LITTLE' if np.uint32(1) == np.uint32(1).newbyteorder("<") else 'BIG' | ||
if reader.byte_order == 'S': | ||
file_endian = 'BIG' if host_endian == 'LITTLE' else 'LITTLE' | ||
else: | ||
file_endian = host_endian | ||
return (host_endian, file_endian) | ||
|
||
|
||
def get_byteorder(reader: GGUFReader) -> GGUFEndian: | ||
if np.uint32(1) == np.uint32(1).newbyteorder("<"): | ||
# Host is little endian | ||
host_endian = GGUFEndian.LITTLE | ||
swapped_endian = GGUFEndian.BIG | ||
else: | ||
# Sorry PDP or other weird systems that don't use BE or LE. | ||
host_endian = GGUFEndian.BIG | ||
swapped_endian = GGUFEndian.LITTLE | ||
|
||
if reader.byte_order == "S": | ||
return swapped_endian | ||
else: | ||
return host_endian | ||
|
||
|
||
def decode_field(field: ReaderField) -> Any: | ||
if field and field.types: | ||
main_type = field.types[0] | ||
|
||
if main_type == GGUFValueType.ARRAY: | ||
sub_type = field.types[-1] | ||
|
||
if sub_type == GGUFValueType.STRING: | ||
return [str(bytes(field.parts[idx]), encoding='utf8') for idx in field.data] | ||
else: | ||
return [pv for idx in field.data for pv in field.parts[idx].tolist()] | ||
if main_type == GGUFValueType.STRING: | ||
return str(bytes(field.parts[-1]), encoding='utf8') | ||
else: | ||
return field.parts[-1][0] | ||
|
||
return None | ||
|
||
|
||
def get_field_data(reader: GGUFReader, key: str) -> Any: | ||
field = reader.get_field(key) | ||
|
||
return decode_field(field) | ||
Check failure on line 72 in gguf-py/scripts/gguf-addfile.py
|
||
|
||
|
||
def copy_with_filename(reader: gguf.GGUFReader, writer: gguf.GGUFWriter, filename: str[Any]) -> None: | ||
Check failure on line 75 in gguf-py/scripts/gguf-addfile.py
|
||
logger.debug(f'copy_with_filename: {filename}') #debug | ||
val = filename | ||
for field in reader.fields.values(): | ||
# Suppress virtual fields and fields written by GGUFWriter | ||
if field.name == Keys.General.ARCHITECTURE or field.name.startswith('GGUF.'): | ||
logger.debug(f'Suppressing {field.name}') | ||
continue | ||
|
||
# Copy existed fields except 'embedded_files' | ||
if not field.name == Keys.EMBEDDED_FILES: | ||
cur_val = decode_field(field) | ||
writer.add_key(field.name) | ||
writer.add_val(cur_val, field.types[0]) | ||
logger.debug(f'Copying {field.name}') | ||
continue | ||
|
||
# Update embedded_files | ||
val = decode_field(field) | ||
for path in filename: | ||
logger.debug(f'Adding {field.name}: {path}') | ||
val.append(path) | ||
|
||
# Add filenames to kv | ||
logger.info(f'* Modifying {Keys.EMBEDDED_FILES} to {val}') | ||
writer.add_array(Keys.EMBEDDED_FILES, val) | ||
|
||
for tensor in reader.tensors: | ||
# Dimensions are written in reverse order, so flip them first | ||
shape = np.flipud(tensor.shape) | ||
writer.add_tensor_info(tensor.name, shape, tensor.data.dtype, tensor.data.nbytes, tensor.tensor_type) | ||
|
||
# Add file info as tensor_info | ||
for path in filename: | ||
logger.debug(f'Adding tensor_info {path}') | ||
with open(path, "rb") as f: | ||
data = f.read() | ||
data_len = len(data) | ||
dims = [data_len] | ||
raw_dtype = GGMLQuantizationType.I8 | ||
writer.add_tensor_info(path, dims, np.float16, data_len, raw_dtype) | ||
|
||
writer.write_header_to_file() | ||
writer.write_kv_data_to_file() | ||
writer.write_ti_data_to_file() | ||
|
||
for tensor in reader.tensors: | ||
writer.write_tensor_data(tensor.data) | ||
|
||
# Write file body as tensor data | ||
for path in filename: | ||
logger.debug(f'Adding tensor data {path}') | ||
with open(path, "rb") as f: | ||
data = f.read() | ||
data_len = len(data) | ||
# write data with padding | ||
writer.write_data(data) | ||
|
||
writer.close() | ||
|
||
|
||
def main() -> None: | ||
parser = argparse.ArgumentParser(description="Add files to GGUF file metadata") | ||
parser.add_argument("input", type=str, help="GGUF format model input filename") | ||
parser.add_argument("output", type=str, help="GGUF format model output filename") | ||
parser.add_argument("addfiles", type=str, nargs='+', help="add filenames ...") | ||
parser.add_argument("--force", action="store_true", help="Bypass warnings without confirmation") | ||
parser.add_argument("--verbose", action="store_true", help="Increase output verbosity") | ||
args = parser.parse_args(None if len(sys.argv) > 1 else ["--help"]) | ||
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) | ||
|
||
logger.info(f'* Loading: {args.input}') | ||
reader = GGUFReader(args.input, 'r') | ||
arch = get_field_data(reader, Keys.General.ARCHITECTURE) | ||
endianess = get_byteorder(reader) | ||
|
||
if os.path.isfile(args.output) and not args.force: | ||
logger.warning('*** Warning *** Warning *** Warning **') | ||
logger.warning(f'* The "{args.output}" GGUF file already exists, it will be overwritten!') | ||
logger.warning('* Enter exactly YES if you are positive you want to proceed:') | ||
response = input('YES, I am sure> ') | ||
if response != 'YES': | ||
logger.info("You didn't enter YES. Okay then, see ya!") | ||
sys.exit(0) | ||
|
||
logger.info(f'* Writing: {args.output}') | ||
writer = GGUFWriter(args.output, arch=arch, endianess=endianess) | ||
|
||
alignment = get_field_data(reader, Keys.General.ALIGNMENT) | ||
if alignment is not None: | ||
logger.debug(f'Setting custom alignment: {alignment}') | ||
writer.data_alignment = alignment | ||
|
||
if args.addfiles is not None: | ||
filename = [] | ||
for path in args.addfiles: | ||
filename.append(path) | ||
logger.info(f'* Adding: {path}') | ||
copy_with_filename(reader, writer, filename) | ||
Check failure on line 173 in gguf-py/scripts/gguf-addfile.py
|
||
|
||
|
||
if __name__ == '__main__': | ||
main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use underscore in the python filenames:
gguf-addfile.py
->gguf_add_file.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for checking.
I renamed the script to gguf_add_file.py.