Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion src/adapter/src/catalog/transact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,46 @@ impl Catalog {
};
table.collections.insert(version, new_global_id);

tx.update_item(id, new_entry.into())?;
for use_id in new_entry.referenced_by() {
let mut entry = state.get_entry(use_id).clone();
entry.item = entry.item.replace_item_refs(id, new_global_id);
tx.update_item(*use_id, entry.into())?;
}

let old_comment_id = CommentObjectId::Table(id);
let new_comment_id = CommentObjectId::Table(new_global_id);
if let Some(comments) = state.comments.get_object_comments(old_comment_id) {
tx.drop_comments(&[old_comment_id].into())?;
for (sub, comment) in comments {
tx.update_comment(new_comment_id, *sub, Some(comment.clone()))?;
}
}

let mz_catalog::durable::Item {
id,
oid,
global_id,
schema_id,
name,
create_sql,
owner_id,
privileges,
extra_versions,
} = new_entry.into();

tx.remove_item(id)?;
tx.insert_item(
new_global_id,
oid,
global_id,
schema_id,
&name,
create_sql,
owner_id,
privileges,
extra_versions,
)?;

storage_collections_to_register.insert(new_global_id, shard_id);
}
Op::CreateDatabase { name, owner_id } => {
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2362,7 +2362,8 @@ impl Coordinator {
CatalogItemId::System(id) => *id >= next_system_item_id,
CatalogItemId::User(id) => *id >= next_user_item_id,
CatalogItemId::IntrospectionSourceIndex(_)
| CatalogItemId::Transient(_) => false,
| CatalogItemId::Transient(_)
| CatalogItemId::Explain => false,
};
if id_too_large {
info!(
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5143,7 +5143,7 @@ impl Coordinator {

self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
Box::pin(async move {
let entry = coord.catalog().get_entry(&relation_id);
let entry = coord.catalog().get_entry(&new_global_id);
let CatalogItem::Table(table) = &entry.item else {
panic!("programming error, expected table found {:?}", entry.item);
};
Expand Down
2 changes: 1 addition & 1 deletion src/catalog-protos/protos/hashes.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"name": "objects.proto",
"md5": "3e9f4c62f87441ac7897d96462f3c0c9"
"md5": "d10b5b1e3f1b2fd6e717d77b1bf5b7d8"
},
{
"name": "objects_v67.proto",
Expand Down
1 change: 1 addition & 0 deletions src/catalog-protos/protos/objects.proto
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ message CatalogItemId {
uint64 user = 2;
uint64 transient = 3;
uint64 introspection_source_index = 4;
Empty explain = 5;
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/catalog-protos/src/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,9 @@ impl RustType<crate::objects::CatalogItemId> for CatalogItemId {
CatalogItemId::Transient(x) => {
crate::objects::catalog_item_id::Value::Transient(*x)
}
CatalogItemId::Explain => {
crate::objects::catalog_item_id::Value::Explain(crate::objects::Empty {})
}
}),
}
}
Expand All @@ -604,6 +607,7 @@ impl RustType<crate::objects::CatalogItemId> for CatalogItemId {
Some(crate::objects::catalog_item_id::Value::Transient(x)) => {
Ok(CatalogItemId::Transient(x))
}
Some(crate::objects::catalog_item_id::Value::Explain(_)) => Ok(CatalogItemId::Explain),
None => Err(TryFromProtoError::missing_field("CatalogItemId::kind")),
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/catalog/src/durable/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ impl TryFrom<CatalogItemId> for SystemCatalogItemId {
CatalogItemId::IntrospectionSourceIndex(_) => Err("introspection_source_index"),
CatalogItemId::User(_) => Err("user"),
CatalogItemId::Transient(_) => Err("transient"),
CatalogItemId::Explain => Err("explain"),
}
}
}
Expand All @@ -662,6 +663,7 @@ impl TryFrom<CatalogItemId> for IntrospectionSourceIndexCatalogItemId {
}
CatalogItemId::User(_) => Err("user"),
CatalogItemId::Transient(_) => Err("transient"),
CatalogItemId::Explain => Err("explain"),
}
}
}
Expand Down
66 changes: 66 additions & 0 deletions src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2009,6 +2009,72 @@ impl CatalogItem {
}
}

pub fn replace_item_refs(&self, old_id: GlobalId, new_id: GlobalId) -> CatalogItem {
let do_rewrite = |create_sql: String| -> String {
let mut create_stmt = mz_sql::parse::parse(&create_sql)
.expect("invalid create sql persisted to catalog")
.into_element()
.ast;
mz_sql::ast::transform::create_stmt_replace_ids(
&mut create_stmt,
&[(old_id, new_id)].into(),
);
create_stmt.to_ast_string_stable()
};

match self {
CatalogItem::Table(i) => {
let mut i = i.clone();
i.create_sql = i.create_sql.map(do_rewrite);
CatalogItem::Table(i)
}
CatalogItem::Log(i) => CatalogItem::Log(i.clone()),
CatalogItem::Source(i) => {
let mut i = i.clone();
i.create_sql = i.create_sql.map(do_rewrite);
CatalogItem::Source(i)
}
CatalogItem::Sink(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql);
CatalogItem::Sink(i)
}
CatalogItem::View(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql);
CatalogItem::View(i)
}
CatalogItem::MaterializedView(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql);
CatalogItem::MaterializedView(i)
}
CatalogItem::Index(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql);
CatalogItem::Index(i)
}
CatalogItem::Secret(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql);
CatalogItem::Secret(i)
}
CatalogItem::Func(_) | CatalogItem::Type(_) => {
unimplemented!()
}
CatalogItem::Connection(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql);
CatalogItem::Connection(i)
}
CatalogItem::ContinualTask(i) => {
let mut i = i.clone();
i.create_sql = do_rewrite(i.create_sql);
CatalogItem::ContinualTask(i)
}
}
}

/// Updates the retain history for an item. Returns the previous retain history value. Returns
/// an error if this item does not support retain history.
pub fn update_retain_history(
Expand Down
1 change: 1 addition & 0 deletions src/repr/src/catalog_item_id.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ message ProtoCatalogItemId {
uint64 user = 2;
uint64 transient = 3;
uint64 introspection_source_index = 4;
google.protobuf.Empty explain = 5;
}
}
95 changes: 5 additions & 90 deletions src/repr/src/catalog_item_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,100 +7,13 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::fmt;
use std::str::FromStr;

use anyhow::{Error, anyhow};
use mz_lowertest::MzReflect;
use mz_proto::{RustType, TryFromProtoError};
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};

include!(concat!(env!("OUT_DIR"), "/mz_repr.catalog_item_id.rs"));

/// The identifier for an item within the Catalog.
#[derive(
Arbitrary,
Clone,
Copy,
Debug,
Eq,
PartialEq,
Ord,
PartialOrd,
Hash,
Serialize,
Deserialize,
MzReflect,
)]
pub enum CatalogItemId {
/// System namespace.
System(u64),
/// Introspection Source Index namespace.
IntrospectionSourceIndex(u64),
/// User namespace.
User(u64),
/// Transient item.
Transient(u64),
}

impl CatalogItemId {
/// Reports whether this ID is in the system namespace.
pub fn is_system(&self) -> bool {
matches!(
self,
CatalogItemId::System(_) | CatalogItemId::IntrospectionSourceIndex(_)
)
}

/// Reports whether this ID is in the user namespace.
pub fn is_user(&self) -> bool {
matches!(self, CatalogItemId::User(_))
}
use crate::GlobalId;

/// Reports whether this ID is for a transient item.
pub fn is_transient(&self) -> bool {
matches!(self, CatalogItemId::Transient(_))
}
}

impl FromStr for CatalogItemId {
type Err = Error;

fn from_str(mut s: &str) -> Result<Self, Self::Err> {
if s.len() < 2 {
return Err(anyhow!("couldn't parse id {}", s));
}
let tag = s.chars().next().unwrap();
s = &s[1..];
let variant = match tag {
's' => {
if Some('i') == s.chars().next() {
s = &s[1..];
CatalogItemId::IntrospectionSourceIndex
} else {
CatalogItemId::System
}
}
'u' => CatalogItemId::User,
't' => CatalogItemId::Transient,
_ => return Err(anyhow!("couldn't parse id {}", s)),
};
let val: u64 = s.parse()?;
Ok(variant(val))
}
}
include!(concat!(env!("OUT_DIR"), "/mz_repr.catalog_item_id.rs"));

impl fmt::Display for CatalogItemId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
CatalogItemId::System(id) => write!(f, "s{}", id),
CatalogItemId::IntrospectionSourceIndex(id) => write!(f, "si{}", id),
CatalogItemId::User(id) => write!(f, "u{}", id),
CatalogItemId::Transient(id) => write!(f, "t{}", id),
}
}
}
pub type CatalogItemId = GlobalId;

impl RustType<ProtoCatalogItemId> for CatalogItemId {
fn into_proto(&self) -> ProtoCatalogItemId {
Expand All @@ -111,6 +24,7 @@ impl RustType<ProtoCatalogItemId> for CatalogItemId {
CatalogItemId::IntrospectionSourceIndex(x) => IntrospectionSourceIndex(*x),
CatalogItemId::User(x) => User(*x),
CatalogItemId::Transient(x) => Transient(*x),
CatalogItemId::Explain => Explain(()),
}),
}
}
Expand All @@ -122,6 +36,7 @@ impl RustType<ProtoCatalogItemId> for CatalogItemId {
Some(IntrospectionSourceIndex(x)) => Ok(CatalogItemId::IntrospectionSourceIndex(x)),
Some(User(x)) => Ok(CatalogItemId::User(x)),
Some(Transient(x)) => Ok(CatalogItemId::Transient(x)),
Some(Explain(())) => Ok(CatalogItemId::Explain),
None => Err(TryFromProtoError::missing_field("ProtoCatalogItemId::kind")),
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/repr/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub use crate::relation_and_scalar::ProtoScalarType;
pub use crate::relation_and_scalar::proto_scalar_type::ProtoRecordField;
use crate::role_id::RoleId;
use crate::row::DatumNested;
use crate::{CatalogItemId, ColumnName, DatumList, DatumMap, Row, RowArena, SqlColumnType};
use crate::{ColumnName, DatumList, DatumMap, GlobalId, Row, RowArena, SqlColumnType};

/// A single value.
///
Expand Down Expand Up @@ -1691,7 +1691,7 @@ pub enum SqlScalarType {
/// always be [`Datum::Null`].
List {
element_type: Box<SqlScalarType>,
custom_id: Option<CatalogItemId>,
custom_id: Option<GlobalId>,
},
/// An ordered and named sequence of datums.
Record {
Expand All @@ -1700,7 +1700,7 @@ pub enum SqlScalarType {
///
/// Boxed slice to reduce the size of the enum variant.
fields: Box<[(ColumnName, SqlColumnType)]>,
custom_id: Option<CatalogItemId>,
custom_id: Option<GlobalId>,
},
/// A PostgreSQL object identifier.
Oid,
Expand All @@ -1711,7 +1711,7 @@ pub enum SqlScalarType {
/// be [`Datum::Null`].
Map {
value_type: Box<SqlScalarType>,
custom_id: Option<CatalogItemId>,
custom_id: Option<GlobalId>,
},
/// A PostgreSQL function name.
RegProc,
Expand Down Expand Up @@ -3906,14 +3906,14 @@ impl Arbitrary for SqlScalarType {
leaf.prop_recursive(2, 3, 5, |inner| {
Union::new(vec![
// List
(inner.clone(), any::<Option<CatalogItemId>>())
(inner.clone(), any::<Option<GlobalId>>())
.prop_map(|(x, id)| SqlScalarType::List {
element_type: Box::new(x),
custom_id: id,
})
.boxed(),
// Map
(inner.clone(), any::<Option<CatalogItemId>>())
(inner.clone(), any::<Option<GlobalId>>())
.prop_map(|(x, id)| SqlScalarType::Map {
value_type: Box::new(x),
custom_id: id,
Expand All @@ -3935,7 +3935,7 @@ impl Arbitrary for SqlScalarType {
prop::collection::vec((any::<ColumnName>(), column_type_strat), 0..10);

// Now we combine it with the default strategies to get Records.
(fields_strat, any::<Option<CatalogItemId>>())
(fields_strat, any::<Option<GlobalId>>())
.prop_map(|(fields, custom_id)| SqlScalarType::Record {
fields: fields.into(),
custom_id,
Expand Down
7 changes: 7 additions & 0 deletions src/sql-parser/src/ast/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ impl RawItemName {
RawItemName::Id(_, name, _) => name,
}
}

pub fn id_mut(&mut self) -> Option<&mut String> {
match self {
RawItemName::Name(_) => None,
RawItemName::Id(id, _, _) => Some(id),
}
}
}

impl AstDisplay for RawItemName {
Expand Down
Loading