Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix various issues around close/1 and stream realiasing #2818

Merged
merged 6 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/machine/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1808,7 +1808,7 @@ impl Machine {
pub(crate) fn push_load_context(&mut self) -> CallResult {
let stream = self.machine_st.get_stream_or_alias(
self.machine_st.registers[1],
&self.indices.stream_aliases,
&self.indices,
atom!("$push_load_context"),
2,
)?;
Expand Down
124 changes: 121 additions & 3 deletions src/machine/machine_indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use crate::atom_table::*;
use crate::forms::*;
use crate::machine::loader::*;
use crate::machine::machine_state::*;
use crate::machine::streams::Stream;
use crate::machine::streams::{Stream, StreamOptions};
use crate::machine::ClauseType;
use crate::machine::MachineStubGen;

use fxhash::FxBuildHasher;
use indexmap::{IndexMap, IndexSet};
Expand Down Expand Up @@ -261,8 +262,8 @@ pub struct IndexStore {
pub(super) meta_predicates: MetaPredicateDir,
pub(super) modules: ModuleDir,
pub(super) op_dir: OpDir,
pub(super) streams: StreamDir,
pub(super) stream_aliases: StreamAliasDir,
streams: StreamDir,
stream_aliases: StreamAliasDir,
}

impl IndexStore {
Expand Down Expand Up @@ -459,6 +460,113 @@ impl IndexStore {
}
}

pub(crate) fn add_stream(
&mut self,
stream: Stream,
stub_name: Atom,
stub_arity: usize,
) -> Result<(), MachineStubGen> {
if let Some(alias) = stream.options().get_alias() {
if self.stream_aliases.contains_key(&alias) {
return Err(Box::new(move |machine_st| {
machine_st.occupied_alias_permission_error(alias, stub_name, stub_arity)
}));
}

self.stream_aliases.insert(alias, stream);
}

self.streams.insert(stream);

Ok(())
}

pub(crate) fn remove_stream(&mut self, stream: Stream) {
if let Some(alias) = stream.options().get_alias() {
debug_assert_eq!(self.stream_aliases.get(&alias), Some(&stream));
assert!(!is_protected_alias(alias));

self.stream_aliases.swap_remove(&alias);
}
self.streams.remove(&stream);
}

pub(crate) fn update_stream_options<F: Fn(&mut StreamOptions)>(
&mut self,
mut stream: Stream,
callback: F,
) {
if let Some(prev_alias) = stream.options().get_alias() {
debug_assert_eq!(self.stream_aliases.get(&prev_alias), Some(&stream));
}
let options = stream.options_mut();
let prev_alias = options.get_alias();

callback(options);

if options.get_alias() != prev_alias {
if prev_alias.map(is_protected_alias).unwrap_or(false)
|| options
.get_alias()
.map(|alias| self.has_stream(alias))
.unwrap_or(false)
{
// user_input, user_output and user_error cannot be realiased,
// and realiasing cannot shadow an existing stream.
options.set_alias_to_atom_opt(prev_alias);
return;
}

if let Some(prev_alias) = prev_alias {
self.stream_aliases.swap_remove(&prev_alias);
}
if let Some(new_alias) = options.get_alias() {
self.stream_aliases.insert(new_alias, stream);
}
}
}

pub(crate) fn has_stream(&self, alias: Atom) -> bool {
self.stream_aliases.contains_key(&alias)
}

/// ## Warning
///
/// The returned stream's options should only be modified through
/// [`IndexStore::update_stream_options`], to avoid breaking the
/// invariants of [`IndexStore`].
pub(crate) fn get_stream(&self, alias: Atom) -> Option<Stream> {
self.stream_aliases.get(&alias).copied()
}

pub(crate) fn iter_streams<'a, R: std::ops::RangeBounds<Stream>>(
&'a self,
range: R,
) -> impl Iterator<Item = Stream> + 'a {
self.streams.range(range).into_iter().copied()
}

/// Forcibly sets `alias` to `stream`.
/// If there was a previous stream with that alias, it will lose that alias.
///
/// Consider using [`add_stream`](Self::add_stream) if you wish to instead
/// return an error when stream aliases conflict.
pub(crate) fn set_stream(&mut self, alias: Atom, mut stream: Stream) {
if let Some(mut prev_stream) = self.get_stream(alias) {
if prev_stream == stream {
// Nothing to do, as the stream is already present
return;
}

prev_stream.options_mut().set_alias_to_atom_opt(None);
}

stream.options_mut().set_alias_to_atom_opt(Some(alias));

self.stream_aliases.insert(alias, stream);
self.streams.insert(stream);
}

#[inline]
pub(super) fn new() -> Self {
index_store!(
Expand All @@ -468,3 +576,13 @@ impl IndexStore {
)
}
}

/// A stream is said to have a "protected" alias if modifying its
/// alias would cause breakage in other parts of the code.
///
/// A stream with a protected alias cannot be realiased through
/// [`IndexStore::update_stream_options`]. Instead, one has to use
/// [`IndexStore::set_stream`] to do so.
fn is_protected_alias(alias: Atom) -> bool {
alias == atom!("user_input") || alias == atom!("user_output") || alias == atom!("user_error")
}
28 changes: 6 additions & 22 deletions src/machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,32 +483,16 @@ impl Machine {
}
}

/// Ensures that [`Machine::indices`] properly reflects
/// the streams stored in [`Machine::user_input`], [`Machine::user_output`]
/// and [`Machine::user_error`].
pub(crate) fn configure_streams(&mut self) {
self.user_input
.options_mut()
.set_alias_to_atom_opt(Some(atom!("user_input")));

self.indices
.stream_aliases
.insert(atom!("user_input"), self.user_input);

self.indices.streams.insert(self.user_input);

self.user_output
.options_mut()
.set_alias_to_atom_opt(Some(atom!("user_output")));

.set_stream(atom!("user_input"), self.user_input);
self.indices
.stream_aliases
.insert(atom!("user_output"), self.user_output);

self.indices.streams.insert(self.user_output);

.set_stream(atom!("user_output"), self.user_output);
self.indices
.stream_aliases
.insert(atom!("user_error"), self.user_error);

self.indices.streams.insert(self.user_error);
.set_stream(atom!("user_error"), self.user_error);
}

#[inline(always)]
Expand Down
Loading
Loading