Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions bc2/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
from .core import Pipeline, PipelineConfig
from .core import IdToMaskMap, IdToNameMap, NameToMaskMap, Pipeline, PipelineConfig

__all__ = ["Pipeline", "PipelineConfig"]
__all__ = [
"Pipeline",
"PipelineConfig",
"NameToMaskMap",
"IdToNameMap",
"IdToMaskMap",
]
9 changes: 8 additions & 1 deletion bc2/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
from .common.name_map import IdToMaskMap, IdToNameMap, NameToMaskMap
from .pipeline import Pipeline, PipelineConfig

__all__ = ["Pipeline", "PipelineConfig"]
__all__ = [
"Pipeline",
"PipelineConfig",
"NameToMaskMap",
"IdToNameMap",
"IdToMaskMap",
]
28 changes: 28 additions & 0 deletions bc2/core/common/all.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import Union

from ..extract import ExtractConfig
from ..input import InputConfig
from ..inspect import InspectConfig
from ..output import OutputConfig
from ..parse import ParseConfig
from ..redact import RedactConfig
from ..render import RenderConfig

AnyConfig = Union[
InputConfig,
ExtractConfig,
RedactConfig,
InspectConfig,
ParseConfig,
RenderConfig,
OutputConfig,
"ChunkConfig",
"ComposeConfig",
]

# NOTE(jnu): the following two imports support some degree of recursive
# module definition, so there are circular imports. To avoid partial import
# errors, we use a forward-ref above and import the modules at the end of
# this module.
from ..control.chunk import ChunkConfig
from ..control.compose import ComposeConfig
10 changes: 10 additions & 0 deletions bc2/core/common/infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def segment( # noqa: C901
original: str,
redacted: str,
delimiters: Sequence[str] = ("<", ">"),
truncated: bool = False,
) -> Generator[TextSegment, None, None]:
"""Visit text segments in the redacted narrative.

Expand All @@ -163,7 +164,16 @@ def segment( # noqa: C901
original: The original narrative text.
redacted: The redacted narrative text.
delimiters: The tokens that mark beginning and end of a redaction.
truncated: If the redacted input might be truncated, call this
function with `truncated=True` to clean up any potential
"ragged ends" at the end of the document.
"""
if truncated:
# TODO(jnu): might be worth just calling this every time. If the
# risk of false positives in removing hanging redactions is basically
# zero, we should get rid of the option and just always do it.
redacted = remove_hanging_redactions(redacted, raw_delimiters=delimiters)

edit_stack = 0
matcher = difflib.SequenceMatcher(None, original, redacted, autojunk=False)

Expand Down
34 changes: 34 additions & 0 deletions bc2/core/common/json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import json
import logging

logger = logging.getLogger(__name__)


def parse_llm_json(s: str, debug: bool = False) -> dict:
"""Parse a JSON response from an LLM.

Account for potential funkiness in the response, according to
how LLMs tend to format their responses.

Args:
s: The JSON response.

Returns:
The parsed JSON response.

Raises:
ValueError: If the JSON response is invalid.
"""
if s.startswith("```json"):
s = s[7:]
if s.startswith("```"):
s = s[3:]
if s.endswith("```"):
s = s[:-3]
try:
return json.loads(s)
except json.JSONDecodeError as e:
if debug:
logger.error(f"Error parsing JSON: {e}")
logger.error("Input:\n" + s)
raise ValueError("Error parsing JSON") from e
191 changes: 191 additions & 0 deletions bc2/core/common/name_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
try:
from typing import Self
except ImportError:
from typing_extensions import Self

from abc import ABC, abstractmethod
from xml.sax.saxutils import escape as xml_escape

_NameMap = dict[str, str]
"""A mapping of human names to aliases.

This could be name to alias, or ID to name, etc.

Example:
{
"Leopold Nudell": "Accused 1",
}

As this is really just a generic dictionary, it's confusing to wield it correctly.
DO NOT use this as a top-level type. Instead, see subclasses of `_NameMapContainer`
to find the correct mapping.
"""

_VerboseNameMap = list[dict[str, str]]


class _NameMapContainer(ABC):
"""Wrapper for `_NameMap`.

Do not use this directly; see subclasses.
"""

@classmethod
def merge(cls, *maps: "_NameMapContainer | dict[str, str] | None") -> Self:
"""Merge multiple maps together."""
real_maps = [m for m in maps if m]

# If there are no input maps, return an empty map
if not real_maps:
return cls()

# Merge all the actual maps.
merged = cls()
for m in real_maps:
if isinstance(m, dict):
merged._map.update(m)
else:
merged._map.update(m._map)

return merged

@property
@abstractmethod
def collection_label(self) -> str: ...

@property
@abstractmethod
def item_label(self) -> str: ...

@property
@abstractmethod
def key_label(self) -> str: ...

@property
@abstractmethod
def value_label(self) -> str: ...

def __init__(self, initial_map: _NameMap | None = None) -> None:
self._map: _NameMap = initial_map.copy() if initial_map else {}

def _set_value(self, key: str, value: str):
self._map[key] = value

def __contains__(self, key: str) -> bool:
return key in self._map

def to_xml(self) -> str:
"""Convert the map to XML."""
xml = f"<{self.collection_label}>"
for key, value in self._map.items():
xml += f"<{self.item_label}>"
xml += f"<{self.key_label}>{xml_escape(key)}</{self.key_label}>"
xml += f"<{self.value_label}>{xml_escape(value)}</{self.value_label}>"
xml += f"</{self.item_label}>"
xml += f"</{self.collection_label}>"
return xml

def to_json(self) -> _VerboseNameMap:
"""Convert the map to a dictionary."""
return [
{self.key_label: key, self.value_label: value}
for key, value in self._map.items()
]

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._map})"

def __str__(self) -> str:
return str(self._map)

def __eq__(self, value: object) -> bool:
if isinstance(value, _NameMapContainer):
return self._map == value._map and self.__class__ == value.__class__
return super().__eq__(value)


class NameToMaskMap(_NameMapContainer):
"""A mapping of names to placeholder values.

example:
rm = NameToMaskMap()
rm.set_mask("Leopold Nudell", "Accused 1")
"""

@property
def collection_label(self) -> str:
return "Names"

@property
def item_label(self) -> str:
return "Name"

@property
def key_label(self) -> str:
return "RealName"

@property
def value_label(self) -> str:
return "ReplacementText"

def set_mask(self, name: str, mask: str) -> "NameToMaskMap":
self._set_value(name, mask)
return self


class IdToNameMap(_NameMapContainer):
"""A mapping of IDs to human names.

example:
im = IdToNameMap()
im.set_name("1234", "Leopold Nudell")
"""

@property
def collection_label(self) -> str:
return "Names"

@property
def item_label(self) -> str:
return "Name"

@property
def key_label(self) -> str:
return "ID"

@property
def value_label(self) -> str:
return "RealName"

def set_name(self, id: str, name: str) -> "IdToNameMap":
self._set_value(id, name)
return self


class IdToMaskMap(_NameMapContainer):
"""A mapping of IDs to mask values.

example:
mm = IdToMaskMap()
mm.set_mask("1234", "Accused 1")
"""

@property
def collection_label(self) -> str:
return "Names"

@property
def item_label(self) -> str:
return "Name"

@property
def key_label(self) -> str:
return "ID"

@property
def value_label(self) -> str:
return "MaskedName"

def set_mask(self, id: str, mask: str) -> "IdToMaskMap":
self._set_value(id, mask)
return self
Loading