From 2fe7b55343ccf6396f4ad6a8088462592302ad06 Mon Sep 17 00:00:00 2001 From: Emilie Burgun Date: Mon, 3 Feb 2025 00:07:39 +0100 Subject: [PATCH 1/6] Fix close/1 messing up stream_aliases when user_input or user_output aren't set to Stdin and Stdout --- src/machine/streams.rs | 62 ++++++++++++++++++++++++++++++++++--- src/machine/system_calls.rs | 51 +++++++++--------------------- 2 files changed, 73 insertions(+), 40 deletions(-) diff --git a/src/machine/streams.rs b/src/machine/streams.rs index d434c569a..212ec4637 100644 --- a/src/machine/streams.rs +++ b/src/machine/streams.rs @@ -1288,11 +1288,10 @@ impl Stream { )) } + /// Drops the stream handle and marks the arena pointer as [`ArenaHeaderTag::Dropped`]. #[inline] pub(crate) fn close(&mut self) -> Result<(), std::io::Error> { - let mut stream = std::mem::replace(self, Stream::Null(StreamOptions::default())); - - match stream { + match self { Stream::NamedTcp(ref mut tcp_stream) => { tcp_stream.inner_mut().tcp_stream.shutdown(Shutdown::Both) } @@ -1322,7 +1321,20 @@ impl Stream { Ok(()) } - _ => Ok(()), + Stream::Byte(mut stream) => { + stream.drop_payload(); + Ok(()) + } + Stream::StaticString(mut stream) => { + stream.drop_payload(); + Ok(()) + } + + Stream::Null(_) => Ok(()), + + Stream::Readline(_) | Stream::StandardOutput(_) | Stream::StandardError(_) => { + unreachable!(); + } } } @@ -1893,3 +1905,45 @@ impl MachineState { } } } + +#[cfg(test)] +mod test { + use super::*; + use crate::machine::config::*; + + #[test] + #[cfg_attr(miri, ignore)] + fn close_memory_user_output_stream() { + let mut machine = MachineBuilder::new() + .with_streams(StreamConfig::in_memory()) + .build(); + + let results = machine + .run_query( + "\\+ \\+ (current_output(Stream), close(Stream)), write(user_output, hello).", + ) + .collect::>(); + + assert_eq!(results.len(), 1); + assert!(results[0].is_ok()); + + let mut actual = String::new(); + machine.user_output.read_to_string(&mut actual).unwrap(); + assert_eq!(actual, "hello"); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn close_memory_user_output_stream_twice() { + let mut machine = MachineBuilder::new() + .with_streams(StreamConfig::in_memory()) + .build(); + + let results = machine + .run_query("\\+ \\+ (current_output(Stream), close(Stream), close(Stream)).") + .collect::>(); + + assert_eq!(results.len(), 1); + assert!(results[0].is_ok()); + } +} diff --git a/src/machine/system_calls.rs b/src/machine/system_calls.rs index 14f9094b8..7a7eeb57c 100644 --- a/src/machine/system_calls.rs +++ b/src/machine/system_calls.rs @@ -3868,47 +3868,26 @@ impl Machine { stream.flush().unwrap(); // 8.11.6.1b) } - self.indices.streams.remove(&stream); - - if stream == self.user_input { - self.user_input = self - .indices - .stream_aliases - .get(&atom!("user_input")) - .cloned() - .unwrap(); - - self.indices.streams.insert(self.user_input); - } else if stream == self.user_output { - self.user_output = self - .indices - .stream_aliases - .get(&atom!("user_output")) - .cloned() - .unwrap(); - - self.indices.streams.insert(self.user_output); + if stream == self.user_input || stream == self.user_output || stream.is_stderr() { + // stdin, stdout and stderr shouldn't be removed from the store, so return now + return Ok(()); } - if !stream.is_stdin() && !stream.is_stdout() && !stream.is_stderr() { - if let Some(alias) = stream.options().get_alias() { - self.indices.stream_aliases.swap_remove(&alias); - } - - let close_result = stream.close(); - - if close_result.is_err() { - let stub = functor_stub(atom!("close"), 1); - let addr = stream_as_cell!(stream); - let err = self - .machine_st - .existence_error(ExistenceError::Stream(addr)); + self.indices.streams.remove(&stream); - return Err(self.machine_st.error_form(err, stub)); - } + if let Some(alias) = stream.options().get_alias() { + self.indices.stream_aliases.swap_remove(&alias); } - Ok(()) + stream.close().map_err(|_| { + let stub = functor_stub(atom!("close"), 1); + let addr = stream_as_cell!(stream); + let err = self + .machine_st + .existence_error(ExistenceError::Stream(addr)); + + self.machine_st.error_form(err, stub) + }) } #[inline(always)] From 0cf46d3ec4f0fc1e9fd9c8222336a28ea3b4ad3b Mon Sep 17 00:00:00 2001 From: Emilie Burgun Date: Mon, 3 Feb 2025 14:00:37 +0100 Subject: [PATCH 2/6] Encapsulate accesses to IndexStore::streams and ::stream_aliases These two fields are able to hold `Stream` instances, which predicates like `close/1` expect to be managed properly for their correctness. To ensure that this is the case, I have removed direct accesses to those two fields, so that they can be properly managed in one place. --- src/machine/loader.rs | 2 +- src/machine/machine_indices.rs | 105 ++++++++++++++++++++- src/machine/mod.rs | 28 ++---- src/machine/streams.rs | 12 +-- src/machine/system_calls.rs | 164 +++++++++++++++------------------ 5 files changed, 187 insertions(+), 124 deletions(-) diff --git a/src/machine/loader.rs b/src/machine/loader.rs index 90080e9e5..b7e7c4b5c 100644 --- a/src/machine/loader.rs +++ b/src/machine/loader.rs @@ -1808,7 +1808,7 @@ impl Machine { pub(crate) fn push_load_context(&mut self) -> CallResult { let stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("$push_load_context"), 2, )?; diff --git a/src/machine/machine_indices.rs b/src/machine/machine_indices.rs index 2f2591252..4c8490683 100644 --- a/src/machine/machine_indices.rs +++ b/src/machine/machine_indices.rs @@ -7,8 +7,9 @@ use crate::atom_table::*; use crate::forms::*; use crate::machine::loader::*; use crate::machine::machine_state::*; -use crate::machine::streams::Stream; +use crate::machine::streams::{Stream, StreamOptions}; use crate::machine::ClauseType; +use crate::machine::MachineStubGen; use fxhash::FxBuildHasher; use indexmap::{IndexMap, IndexSet}; @@ -261,8 +262,8 @@ pub struct IndexStore { pub(super) meta_predicates: MetaPredicateDir, pub(super) modules: ModuleDir, pub(super) op_dir: OpDir, - pub(super) streams: StreamDir, - pub(super) stream_aliases: StreamAliasDir, + streams: StreamDir, + stream_aliases: StreamAliasDir, } impl IndexStore { @@ -459,6 +460,94 @@ impl IndexStore { } } + pub(crate) fn add_stream( + &mut self, + stream: Stream, + stub_name: Atom, + stub_arity: usize, + ) -> Result<(), MachineStubGen> { + if let Some(alias) = stream.options().get_alias() { + if self.stream_aliases.contains_key(&alias) { + return Err(Box::new(move |machine_st| { + machine_st.occupied_alias_permission_error(alias, stub_name, stub_arity) + })); + } + + self.stream_aliases.insert(alias, stream); + } + + self.streams.insert(stream); + + Ok(()) + } + + pub(crate) fn remove_stream(&mut self, stream: Stream) { + if let Some(alias) = stream.options().get_alias() { + debug_assert_eq!(self.stream_aliases.get(&alias), Some(&stream)); + self.stream_aliases.swap_remove(&alias); + } + self.streams.remove(&stream); + } + + pub(crate) fn update_stream_options( + &mut self, + mut stream: Stream, + callback: F, + ) { + if let Some(prev_alias) = stream.options().get_alias() { + debug_assert_eq!(self.stream_aliases.get(&prev_alias), Some(&stream)); + } + let options = stream.options_mut(); + let prev_alias = options.get_alias(); + + callback(options); + + if options.get_alias() != prev_alias { + if let Some(prev_alias) = prev_alias { + self.stream_aliases.swap_remove(&prev_alias); + } + if let Some(new_alias) = options.get_alias() { + self.stream_aliases.insert(new_alias, stream); + } + } + } + + pub(crate) fn has_stream(&self, alias: Atom) -> bool { + self.stream_aliases.contains_key(&alias) + } + + pub(crate) fn get_stream(&self, alias: Atom) -> Option { + self.stream_aliases.get(&alias).copied() + } + + pub(crate) fn iter_streams<'a, R: std::ops::RangeBounds>( + &'a self, + range: R, + ) -> impl Iterator + 'a { + self.streams.range(range).into_iter().copied() + } + + /// Forcibly sets `alias` to `stream`. + /// If there was a previous stream with that alias, it will lose that alias. + /// + /// Consider using [`add_stream`](Self::add_stream) if you wish to instead + /// return an error when stream aliases conflict. + pub(crate) fn set_stream(&mut self, alias: Atom, mut stream: Stream) { + if let Some(mut prev_stream) = self.get_stream(alias) { + if prev_stream == stream { + // Nothing to do, as the stream is already present + return; + } + + prev_stream.options_mut().set_alias_to_atom_opt(None); + } + + stream.options_mut().set_alias_to_atom_opt(Some(alias)); + + self.stream_aliases.insert(alias, stream); + self.streams.insert(stream); + } + #[inline] pub(super) fn new() -> Self { index_store!( @@ -468,3 +557,13 @@ impl IndexStore { ) } } + +/// A stream is said to have a "protected" alias if modifying its +/// alias would cause breakage in other parts of the code. +/// +/// A stream with a protected alias cannot be realiased through +/// [`IndexStore::update_stream_options`]. Instead, one has to use +/// [`IndexStore::set_stream`] to do so. +fn is_protected_alias(alias: Atom) -> bool { + alias == atom!("user_input") || alias == atom!("user_output") || alias == atom!("user_error") +} diff --git a/src/machine/mod.rs b/src/machine/mod.rs index a62c0caa8..57a6213b4 100644 --- a/src/machine/mod.rs +++ b/src/machine/mod.rs @@ -483,32 +483,16 @@ impl Machine { } } + /// Ensures that [`Machine::indices`] properly reflects + /// the streams stored in [`Machine::user_input`], [`Machine::user_output`] + /// and [`Machine::user_error`]. pub(crate) fn configure_streams(&mut self) { - self.user_input - .options_mut() - .set_alias_to_atom_opt(Some(atom!("user_input"))); - self.indices - .stream_aliases - .insert(atom!("user_input"), self.user_input); - - self.indices.streams.insert(self.user_input); - - self.user_output - .options_mut() - .set_alias_to_atom_opt(Some(atom!("user_output"))); - + .set_stream(atom!("user_input"), self.user_input); self.indices - .stream_aliases - .insert(atom!("user_output"), self.user_output); - - self.indices.streams.insert(self.user_output); - + .set_stream(atom!("user_output"), self.user_output); self.indices - .stream_aliases - .insert(atom!("user_error"), self.user_error); - - self.indices.streams.insert(self.user_error); + .set_stream(atom!("user_error"), self.user_error); } #[inline(always)] diff --git a/src/machine/streams.rs b/src/machine/streams.rs index 212ec4637..3432fadcd 100644 --- a/src/machine/streams.rs +++ b/src/machine/streams.rs @@ -1594,7 +1594,7 @@ impl MachineState { pub(crate) fn get_stream_or_alias( &mut self, addr: HeapCellValue, - stream_aliases: &StreamAliasDir, + indices: &IndexStore, caller: Atom, arity: usize, ) -> Result { @@ -1604,8 +1604,8 @@ impl MachineState { (HeapCellValueTag::Atom, (name, arity)) => { debug_assert_eq!(arity, 0); - return match stream_aliases.get(&name) { - Some(stream) if !stream.is_null_stream() => Ok(*stream), + return match indices.get_stream(name) { + Some(stream) if !stream.is_null_stream() => Ok(stream), _ => { let stub = functor_stub(caller, arity); let addr = atom_as_cell!(name); @@ -1622,8 +1622,8 @@ impl MachineState { debug_assert_eq!(arity, 0); - return match stream_aliases.get(&name) { - Some(stream) if !stream.is_null_stream() => Ok(*stream), + return match indices.get_stream(name) { + Some(stream) if !stream.is_null_stream() => Ok(stream), _ => { let stub = functor_stub(caller, arity); let addr = atom_as_cell!(name); @@ -1813,7 +1813,7 @@ impl MachineState { // 8.11.5.3l) if let Some(alias) = options.get_alias() { - if indices.stream_aliases.contains_key(&alias) { + if indices.has_stream(alias) { return Err(self.occupied_alias_permission_error(alias, atom!("open"), 4)); } } diff --git a/src/machine/system_calls.rs b/src/machine/system_calls.rs index 7a7eeb57c..2b05b8f03 100644 --- a/src/machine/system_calls.rs +++ b/src/machine/system_calls.rs @@ -41,7 +41,6 @@ use indexmap::IndexSet; use std::cell::Cell; use std::cmp::Ordering; -use std::collections::BTreeSet; use std::convert::TryFrom; use std::env; #[cfg(feature = "ffi")] @@ -55,7 +54,6 @@ use std::mem; use std::net::{SocketAddr, ToSocketAddrs}; use std::net::{TcpListener, TcpStream}; use std::num::NonZeroU32; -use std::ops::Sub; use std::process; #[cfg(feature = "http")] use std::str::FromStr; @@ -2543,7 +2541,7 @@ impl Machine { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("peek_byte"), 2, )?; @@ -2634,7 +2632,7 @@ impl Machine { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("peek_char"), 2, )?; @@ -2726,7 +2724,7 @@ impl Machine { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("peek_code"), 2, )?; @@ -3147,7 +3145,7 @@ impl Machine { pub(crate) fn put_code(&mut self) -> CallResult { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("put_code"), 2, )?; @@ -3199,7 +3197,7 @@ impl Machine { pub(crate) fn put_char(&mut self) -> CallResult { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("put_char"), 2, )?; @@ -3243,7 +3241,7 @@ impl Machine { pub(crate) fn put_chars(&mut self) -> CallResult { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("$put_chars"), 2, )?; @@ -3292,7 +3290,7 @@ impl Machine { pub(crate) fn put_byte(&mut self) -> CallResult { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("put_byte"), 2, )?; @@ -3359,7 +3357,7 @@ impl Machine { pub(crate) fn get_byte(&mut self) -> CallResult { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("get_byte"), 2, )?; @@ -3444,7 +3442,7 @@ impl Machine { pub(crate) fn get_char(&mut self) -> CallResult { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("get_char"), 2, )?; @@ -3539,7 +3537,7 @@ impl Machine { pub(crate) fn get_n_chars(&mut self) -> CallResult { let stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("get_n_chars"), 3, )?; @@ -3608,7 +3606,7 @@ impl Machine { pub(crate) fn get_code(&mut self) -> CallResult { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("get_code"), 2, )?; @@ -3715,19 +3713,11 @@ impl Machine { #[inline(always)] pub(crate) fn first_stream(&mut self) { - let mut first_stream = None; - let mut null_streams = BTreeSet::new(); - - for stream in self.indices.streams.iter().cloned() { - if !stream.is_null_stream() { - first_stream = Some(stream); - break; - } else { - null_streams.insert(stream); - } - } - - self.indices.streams = self.indices.streams.sub(&null_streams); + let first_stream = self + .indices + .iter_streams(..) + .filter(|s| !s.is_null_stream()) + .next(); if let Some(first_stream) = first_stream { let stream = stream_as_cell!(first_stream); @@ -3743,20 +3733,12 @@ impl Machine { #[inline(always)] pub(crate) fn next_stream(&mut self) { let prev_stream = cell_as_stream!(self.deref_register(1)); - - let mut next_stream = None; - let mut null_streams = BTreeSet::new(); - - for stream in self.indices.streams.range(prev_stream..).skip(1).cloned() { - if !stream.is_null_stream() { - next_stream = Some(stream); - break; - } else { - null_streams.insert(stream); - } - } - - self.indices.streams = self.indices.streams.sub(&null_streams); + let next_stream = self + .indices + .iter_streams(prev_stream..) + .filter(|s| !s.is_null_stream()) + .skip(1) + .next(); if let Some(next_stream) = next_stream { let var = self.deref_register(2).as_var().unwrap(); @@ -3772,7 +3754,7 @@ impl Machine { pub(crate) fn flush_output(&mut self) -> CallResult { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("flush_output"), 1, )?; @@ -3859,7 +3841,7 @@ impl Machine { pub(crate) fn close(&mut self) -> CallResult { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("close"), 2, )?; @@ -3873,11 +3855,7 @@ impl Machine { return Ok(()); } - self.indices.streams.remove(&stream); - - if let Some(alias) = stream.options().get_alias() { - self.indices.stream_aliases.swap_remove(&alias); - } + self.indices.remove_stream(stream); stream.close().map_err(|_| { let stub = functor_stub(atom!("close"), 1); @@ -4445,11 +4423,10 @@ impl Machine { &mut self.machine_st.arena, ); *stream.options_mut() = StreamOptions::default(); - if let Some(alias) = stream.options().get_alias() { - self.indices.stream_aliases.insert(alias, stream); - } - self.indices.streams.insert(stream); + self.indices + .add_stream(stream, atom!("http_open"), 3) + .map_err(|stub_gen| stub_gen(&mut self.machine_st))?; let stream = stream_as_cell!(stream); @@ -4667,7 +4644,10 @@ impl Machine { ); *stream.options_mut() = StreamOptions::default(); stream.options_mut().set_stream_type(StreamType::Binary); - self.indices.streams.insert(stream); + + self.indices.add_stream(stream, atom!("http_accept"), 7) + .map_err(|stub_gen| stub_gen(&mut self.machine_st))?; + let stream = stream_as_cell!(stream); let handle: TypedArenaPtr = arena_alloc!(request.response, &mut self.machine_st.arena); @@ -4781,7 +4761,11 @@ impl Machine { ); *stream.options_mut() = StreamOptions::default(); stream.options_mut().set_stream_type(StreamType::Binary); - self.indices.streams.insert(stream); + + + self.indices.add_stream(stream, atom!("http_answer"), 4) + .map_err(|stub_gen| stub_gen(&mut self.machine_st))?; + let stream = stream_as_cell!(stream); self.machine_st.bind(stream_addr.as_var().unwrap(), stream); } @@ -5096,11 +5080,10 @@ impl Machine { .stream_from_file_spec(file_spec, &mut self.indices, &options)?; *stream.options_mut() = options; - self.indices.streams.insert(stream); - if let Some(alias) = stream.options().get_alias() { - self.indices.stream_aliases.insert(alias, stream); - } + self.indices + .add_stream(stream, atom!("open"), 4) + .map_err(|stub_gen| stub_gen(&mut self.machine_st))?; let stream_var = self.deref_register(3); self.machine_st @@ -5181,7 +5164,7 @@ impl Machine { pub(crate) fn set_stream_options(&mut self) -> CallResult { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("open"), 4, )?; @@ -5937,12 +5920,9 @@ impl Machine { pub(crate) fn set_input(&mut self) -> CallResult { let addr = self.deref_register(1); - let stream = self.machine_st.get_stream_or_alias( - addr, - &self.indices.stream_aliases, - atom!("set_input"), - 1, - )?; + let stream = + self.machine_st + .get_stream_or_alias(addr, &self.indices, atom!("set_input"), 1)?; if !stream.is_input_stream() { let stub = functor_stub(atom!("set_input"), 1); @@ -5964,12 +5944,9 @@ impl Machine { #[inline(always)] pub(crate) fn set_output(&mut self) -> CallResult { let addr = self.deref_register(1); - let stream = self.machine_st.get_stream_or_alias( - addr, - &self.indices.stream_aliases, - atom!("set_output"), - 1, - )?; + let stream = + self.machine_st + .get_stream_or_alias(addr, &self.indices, atom!("set_output"), 1)?; if !stream.is_output_stream() { let stub = functor_stub(atom!("set_output"), 1); @@ -6275,7 +6252,7 @@ impl Machine { let stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("read_term"), 3, )?; @@ -6516,7 +6493,7 @@ impl Machine { } if let Some(alias) = options.get_alias() { - if self.indices.stream_aliases.contains_key(&alias) { + if self.indices.has_stream(alias) { return Err(self.machine_st.occupied_alias_permission_error( alias, atom!("socket_client_open"), @@ -6532,11 +6509,9 @@ impl Machine { *stream.options_mut() = options; - if let Some(alias) = stream.options().get_alias() { - self.indices.stream_aliases.insert(alias, stream); - } - - self.indices.streams.insert(stream); + self.indices + .add_stream(stream, atom!("socket_client_open"), 7) + .map_err(|stub_gen| stub_gen(&mut self.machine_st))?; stream_as_cell!(stream) } @@ -6544,7 +6519,7 @@ impl Machine { return Err(self.machine_st.open_permission_error( addr, atom!("socket_client_open"), - 3, + 7, )); } Err(ErrorKind::NotFound) => { @@ -6661,7 +6636,7 @@ impl Machine { } if let Some(alias) = options.get_alias() { - if self.indices.stream_aliases.contains_key(&alias) { + if self.indices.has_stream(alias) { return Err(self.machine_st.occupied_alias_permission_error( alias, atom!("socket_server_accept"), @@ -6688,11 +6663,10 @@ impl Machine { *tcp_stream.options_mut() = options; - if let Some(alias) = &tcp_stream.options().get_alias() { - self.indices.stream_aliases.insert(*alias, tcp_stream); - } - - self.indices.streams.insert(tcp_stream); + self.indices.add_stream(tcp_stream, atom!("socket_server_accept"), 4) + .map_err(|stub_gen| { + stub_gen(&mut self.machine_st) + })?; let tcp_stream = stream_as_cell!(tcp_stream); let client = atom_as_cell!(client); @@ -6728,7 +6702,7 @@ impl Machine { { let stream0 = self.machine_st.get_stream_or_alias( self.machine_st.registers[2], - &self.indices.stream_aliases, + &self.indices, atom!("tls_client_negotiate"), 3, )?; @@ -6747,7 +6721,10 @@ impl Machine { let addr = atom!("TLS"); let stream = Stream::from_tls_stream(addr, stream, &mut self.machine_st.arena); - self.indices.streams.insert(stream); + + self.indices + .add_stream(stream, atom!("tls_client_negotiate"), 3) + .map_err(|stub_gen| stub_gen(&mut self.machine_st))?; self.machine_st.heap.push(stream_as_cell!(stream)); let stream_addr = self.deref_register(3); @@ -6782,7 +6759,7 @@ impl Machine { let stream0 = self.machine_st.get_stream_or_alias( self.machine_st.registers[3], - &self.indices.stream_aliases, + &self.indices, atom!("tls_server_negotiate"), 3, )?; @@ -6801,7 +6778,10 @@ impl Machine { }; let stream = Stream::from_tls_stream(atom!("TLS"), stream, &mut self.machine_st.arena); - self.indices.streams.insert(stream); + + self.indices + .add_stream(stream, atom!("tls_server_negotiate"), 3) + .map_err(|stub_gen| stub_gen(&mut self.machine_st))?; let stream_addr = self.deref_register(4); self.machine_st @@ -6843,7 +6823,7 @@ impl Machine { pub(crate) fn set_stream_position(&mut self) -> CallResult { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("set_stream_position"), 2, )?; @@ -6886,7 +6866,7 @@ impl Machine { pub(crate) fn stream_property(&mut self) -> CallResult { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("stream_property"), 2, )?; @@ -7237,7 +7217,7 @@ impl Machine { pub(crate) fn write_term(&mut self) -> CallResult { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("write_term"), 3, )?; @@ -8113,7 +8093,7 @@ impl Machine { pub(crate) fn devour_whitespace(&mut self) -> CallResult { let mut stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], - &self.indices.stream_aliases, + &self.indices, atom!("$devour_whitespace"), 1, )?; From 949d316773906807c1d9a07891411f66d72c62e5 Mon Sep 17 00:00:00 2001 From: Emilie Burgun Date: Mon, 3 Feb 2025 14:26:33 +0100 Subject: [PATCH 3/6] Fix realiased streams causing close/1 to leave a dangling stream --- src/machine/streams.rs | 22 +++++++++++++++++++++- src/machine/system_calls.rs | 8 +++++--- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/machine/streams.rs b/src/machine/streams.rs index 3432fadcd..ca348a0ef 100644 --- a/src/machine/streams.rs +++ b/src/machine/streams.rs @@ -640,7 +640,7 @@ impl Stream { } } - pub fn options_mut(&mut self) -> &mut StreamOptions { + pub(super) fn options_mut(&mut self) -> &mut StreamOptions { match self { Stream::Byte(ref mut ptr) => &mut ptr.options, Stream::InputFile(ref mut ptr) => &mut ptr.options, @@ -1946,4 +1946,24 @@ mod test { assert_eq!(results.len(), 1); assert!(results[0].is_ok()); } + + #[test] + #[cfg_attr(miri, ignore)] + fn close_realiased_stream() { + let mut machine = MachineBuilder::new().build(); + + let results = machine + .run_query(r#" + \+ \+ ( + open("README.md", read, S, [alias(readme)]), + open(stream(S), read, _, [alias(another_alias)]), + close(S) + ), + open("README.md", read, _, [alias(readme)]). + "#) + .collect::>(); + + assert_eq!(results.len(), 1); + assert!(results[0].is_ok()); + } } diff --git a/src/machine/system_calls.rs b/src/machine/system_calls.rs index 2b05b8f03..50c28549f 100644 --- a/src/machine/system_calls.rs +++ b/src/machine/system_calls.rs @@ -5162,7 +5162,7 @@ impl Machine { #[inline(always)] pub(crate) fn set_stream_options(&mut self) -> CallResult { - let mut stream = self.machine_st.get_stream_or_alias( + let stream = self.machine_st.get_stream_or_alias( self.machine_st.registers[1], &self.indices, atom!("open"), @@ -5174,10 +5174,12 @@ impl Machine { let reposition = self.machine_st.registers[4]; let stream_type = self.machine_st.registers[5]; - let options = + let new_options = self.machine_st .get_stream_options(alias, eof_action, reposition, stream_type); - *stream.options_mut() = options; + self.indices.update_stream_options(stream, |options| { + *options = new_options; + }); Ok(()) } From 7f2ce57ba77b68fe85fcc25bbf94f9450ad9cc9d Mon Sep 17 00:00:00 2001 From: Emilie Burgun Date: Mon, 3 Feb 2025 14:58:04 +0100 Subject: [PATCH 4/6] Fix stream realiasing possibly shadowing other streams. --- src/machine/machine_indices.rs | 19 ++++++++++++++ src/machine/streams.rs | 47 ++++++++++++++++++++++++++++++++-- 2 files changed, 64 insertions(+), 2 deletions(-) diff --git a/src/machine/machine_indices.rs b/src/machine/machine_indices.rs index 4c8490683..e2d7b2488 100644 --- a/src/machine/machine_indices.rs +++ b/src/machine/machine_indices.rs @@ -484,6 +484,8 @@ impl IndexStore { pub(crate) fn remove_stream(&mut self, stream: Stream) { if let Some(alias) = stream.options().get_alias() { debug_assert_eq!(self.stream_aliases.get(&alias), Some(&stream)); + assert!(!is_protected_alias(alias)); + self.stream_aliases.swap_remove(&alias); } self.streams.remove(&stream); @@ -503,6 +505,18 @@ impl IndexStore { callback(options); if options.get_alias() != prev_alias { + if prev_alias.map(is_protected_alias).unwrap_or(false) + || options + .get_alias() + .map(|alias| self.has_stream(alias)) + .unwrap_or(false) + { + // user_input, user_output and user_error cannot be realiased, + // and realiasing cannot shadow an existing stream. + options.set_alias_to_atom_opt(prev_alias); + return; + } + if let Some(prev_alias) = prev_alias { self.stream_aliases.swap_remove(&prev_alias); } @@ -516,6 +530,11 @@ impl IndexStore { self.stream_aliases.contains_key(&alias) } + /// ## Warning + /// + /// The returned stream's options should only be modified through + /// [`IndexStore::update_stream_options`], to avoid breaking the + /// invariants of [`IndexStore`]. pub(crate) fn get_stream(&self, alias: Atom) -> Option { self.stream_aliases.get(&alias).copied() } diff --git a/src/machine/streams.rs b/src/machine/streams.rs index ca348a0ef..b1ecba035 100644 --- a/src/machine/streams.rs +++ b/src/machine/streams.rs @@ -1476,6 +1476,10 @@ impl MachineState { } } + /// ## Warning + /// + /// The options of streams stored in `Machine::indices` should only + /// be modified through [`IndexStore::update_stream_options`]. pub(crate) fn get_stream_options( &mut self, alias: HeapCellValue, @@ -1591,6 +1595,19 @@ impl MachineState { options } + /// If `addr` is a [`Cons`](HeapCellValueTag::Cons) to a stream, then returns it. + /// + /// If it is an atom or a string, then this searches for the corresponding stream + /// inside of [`self.indices`], returning it. + /// + /// ## Warning + /// + /// **Do not directly modify [`stream.options_mut()`](Stream::options_mut) + /// on the returned stream.** + /// + /// Other functions rely on the invariants of [`IndexStore`], which may + /// become invalidated by the direct modification of a stream's option (namely, + /// its alias name). Instead, use [`IndexStore::update_stream_options`]. pub(crate) fn get_stream_or_alias( &mut self, addr: HeapCellValue, @@ -1953,14 +1970,40 @@ mod test { let mut machine = MachineBuilder::new().build(); let results = machine - .run_query(r#" + .run_query( + r#" \+ \+ ( open("README.md", read, S, [alias(readme)]), open(stream(S), read, _, [alias(another_alias)]), close(S) ), open("README.md", read, _, [alias(readme)]). - "#) + "#, + ) + .collect::>(); + + assert_eq!(results.len(), 1); + assert!(results[0].is_ok()); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn close_realiased_user_output() { + let mut machine = MachineBuilder::new() + .with_streams(StreamConfig::in_memory()) + .build(); + + let results = machine + .run_query( + r#" + \+ \+ ( + open("README.md", read, S), + open(stream(S), read, _, [alias(user_output)]), + close(S) + ), + write(user_output, hello). + "#, + ) .collect::>(); assert_eq!(results.len(), 1); From d8213e29c5a4a2e628bd2e15400165bf3c3e1049 Mon Sep 17 00:00:00 2001 From: Emilie Burgun Date: Thu, 6 Feb 2025 22:55:40 +0100 Subject: [PATCH 5/6] Fix set_output/1 and set_input/1 not updating the alias Before this change, the following set of queries would behave incorrectly: ``` ?- open("/tmp/out.log", write, S), set_output(S). prints(""), write("/tmp/out.log", "S = stream(...)"). ?- write(user_output, hello). prints("hello"), unexpected. prints(""), write("/tmp/out.log", "hello"). % Expected, but not found. ``` Now, `set_output/1` and `set_input/1` properly bind the `user_output` and `user_input` aliases, making the queries above behave as expected. --- src/machine/system_calls.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/machine/system_calls.rs b/src/machine/system_calls.rs index 50c28549f..71e901822 100644 --- a/src/machine/system_calls.rs +++ b/src/machine/system_calls.rs @@ -5940,6 +5940,7 @@ impl Machine { } self.user_input = stream; + self.indices.set_stream(atom!("user_input"), stream); Ok(()) } @@ -5964,6 +5965,7 @@ impl Machine { } self.user_output = stream; + self.indices.set_stream(atom!("user_output"), stream); Ok(()) } From 2a218f34b91d6d5ab7f3c45c3523b03b2812b693 Mon Sep 17 00:00:00 2001 From: Emilie Burgun Date: Thu, 6 Feb 2025 23:47:22 +0100 Subject: [PATCH 6/6] Test corner cases of stream aliasing --- src/tests/stream-aliasing.pl | 19 +++++++++++++++++++ .../cli/src_tests/alias_dropped_stream.stdin | 2 ++ .../cli/src_tests/alias_dropped_stream.stdout | 1 + .../cli/src_tests/alias_dropped_stream.toml | 2 ++ .../cli/src_tests/realias_user_output.stdin | 2 ++ .../cli/src_tests/realias_user_output.stdout | 1 + .../cli/src_tests/realias_user_output.toml | 2 ++ .../cli/src_tests/set_output_alias.stderr | 1 + .../cli/src_tests/set_output_alias.stdin | 2 ++ .../cli/src_tests/set_output_alias.toml | 2 ++ 10 files changed, 34 insertions(+) create mode 100644 src/tests/stream-aliasing.pl create mode 100644 tests/scryer/cli/src_tests/alias_dropped_stream.stdin create mode 100644 tests/scryer/cli/src_tests/alias_dropped_stream.stdout create mode 100644 tests/scryer/cli/src_tests/alias_dropped_stream.toml create mode 100644 tests/scryer/cli/src_tests/realias_user_output.stdin create mode 100644 tests/scryer/cli/src_tests/realias_user_output.stdout create mode 100644 tests/scryer/cli/src_tests/realias_user_output.toml create mode 100644 tests/scryer/cli/src_tests/set_output_alias.stderr create mode 100644 tests/scryer/cli/src_tests/set_output_alias.stdin create mode 100644 tests/scryer/cli/src_tests/set_output_alias.toml diff --git a/src/tests/stream-aliasing.pl b/src/tests/stream-aliasing.pl new file mode 100644 index 000000000..3b4eb1ace --- /dev/null +++ b/src/tests/stream-aliasing.pl @@ -0,0 +1,19 @@ +% NOTE: the tests in this file will need to be changed once +% `open(stream(S1), read, _, NewOptions)` creates a new stream handle `S2` instead of updating +% the options in `S1`. In that case, that specific query will need to be updated +% to instead change the options of `S1`. + +alias_dropped_stream :- + open("README.md", read, S, [alias(readme)]), + open(stream(S), read, _, [alias(not_readme)]), + close(S), + stream_property(readme, file_name(_)). % Should throw an existence_error + +realias_user_output :- + current_output(S), + open(stream(S), read, _, [alias(not_user_output)]), + stream_property(S, alias(user_output)). % Should succeed + +set_output_alias :- + set_output(user_error), + write(user_output, hello). % Should write into stderr, not stdout diff --git a/tests/scryer/cli/src_tests/alias_dropped_stream.stdin b/tests/scryer/cli/src_tests/alias_dropped_stream.stdin new file mode 100644 index 000000000..39dd91345 --- /dev/null +++ b/tests/scryer/cli/src_tests/alias_dropped_stream.stdin @@ -0,0 +1,2 @@ +alias_dropped_stream. +halt. diff --git a/tests/scryer/cli/src_tests/alias_dropped_stream.stdout b/tests/scryer/cli/src_tests/alias_dropped_stream.stdout new file mode 100644 index 000000000..1291cbcc6 --- /dev/null +++ b/tests/scryer/cli/src_tests/alias_dropped_stream.stdout @@ -0,0 +1 @@ + error(existence_error(stream,readme),stream_property/0). diff --git a/tests/scryer/cli/src_tests/alias_dropped_stream.toml b/tests/scryer/cli/src_tests/alias_dropped_stream.toml new file mode 100644 index 000000000..06611a9a3 --- /dev/null +++ b/tests/scryer/cli/src_tests/alias_dropped_stream.toml @@ -0,0 +1,2 @@ +# Part of issue 2806 +args = ["-f", "--no-add-history", "src/tests/stream-aliasing.pl"] diff --git a/tests/scryer/cli/src_tests/realias_user_output.stdin b/tests/scryer/cli/src_tests/realias_user_output.stdin new file mode 100644 index 000000000..e6b5d912e --- /dev/null +++ b/tests/scryer/cli/src_tests/realias_user_output.stdin @@ -0,0 +1,2 @@ +realias_user_output. +halt. diff --git a/tests/scryer/cli/src_tests/realias_user_output.stdout b/tests/scryer/cli/src_tests/realias_user_output.stdout new file mode 100644 index 000000000..d5c487011 --- /dev/null +++ b/tests/scryer/cli/src_tests/realias_user_output.stdout @@ -0,0 +1 @@ + true. diff --git a/tests/scryer/cli/src_tests/realias_user_output.toml b/tests/scryer/cli/src_tests/realias_user_output.toml new file mode 100644 index 000000000..06611a9a3 --- /dev/null +++ b/tests/scryer/cli/src_tests/realias_user_output.toml @@ -0,0 +1,2 @@ +# Part of issue 2806 +args = ["-f", "--no-add-history", "src/tests/stream-aliasing.pl"] diff --git a/tests/scryer/cli/src_tests/set_output_alias.stderr b/tests/scryer/cli/src_tests/set_output_alias.stderr new file mode 100644 index 000000000..2b8b2f183 --- /dev/null +++ b/tests/scryer/cli/src_tests/set_output_alias.stderr @@ -0,0 +1 @@ +hello true. diff --git a/tests/scryer/cli/src_tests/set_output_alias.stdin b/tests/scryer/cli/src_tests/set_output_alias.stdin new file mode 100644 index 000000000..43ae91e00 --- /dev/null +++ b/tests/scryer/cli/src_tests/set_output_alias.stdin @@ -0,0 +1,2 @@ +set_output_alias. +halt. diff --git a/tests/scryer/cli/src_tests/set_output_alias.toml b/tests/scryer/cli/src_tests/set_output_alias.toml new file mode 100644 index 000000000..06611a9a3 --- /dev/null +++ b/tests/scryer/cli/src_tests/set_output_alias.toml @@ -0,0 +1,2 @@ +# Part of issue 2806 +args = ["-f", "--no-add-history", "src/tests/stream-aliasing.pl"]