Skip to content

Commit bbe72e8

Browse files
committed
feat: add scan io metrics and auto increment helper
1 parent ac0fb76 commit bbe72e8

File tree

6 files changed

+190
-20
lines changed

6 files changed

+190
-20
lines changed

src/common/base/src/runtime/profile/profiles.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@ pub enum ProfileStatisticsName {
3535
OutputRows,
3636
OutputBytes,
3737
ScanBytes,
38-
ScanCacheBytes,
3938
ScanPartitions,
39+
ScanBytesFromRemote,
40+
ScanBytesFromLocal,
41+
ScanBytesFromMemory,
4042

4143
RemoteSpillWriteCount,
4244
RemoteSpillWriteBytes,
@@ -190,20 +192,34 @@ pub fn get_statistics_desc() -> Arc<BTreeMap<ProfileStatisticsName, ProfileDesc>
190192
unit: StatisticsUnit::Bytes,
191193
plain_statistics: true,
192194
}),
193-
(ProfileStatisticsName::ScanCacheBytes, ProfileDesc {
194-
display_name: "bytes scanned from cache",
195-
desc: "The bytes scanned from cache of query",
196-
index: ProfileStatisticsName::ScanCacheBytes as usize,
197-
unit: StatisticsUnit::Bytes,
198-
plain_statistics: true,
199-
}),
200195
(ProfileStatisticsName::ScanPartitions, ProfileDesc {
201196
display_name: "partitions scanned",
202197
desc: "The partitions scanned of query",
203198
index: ProfileStatisticsName::ScanPartitions as usize,
204199
unit: StatisticsUnit::Count,
205200
plain_statistics: true,
206201
}),
202+
(ProfileStatisticsName::ScanBytesFromRemote, ProfileDesc {
203+
display_name: "bytes scanned from remote",
204+
desc: "The bytes scanned from remote storage (compressed)",
205+
index: ProfileStatisticsName::ScanBytesFromRemote as usize,
206+
unit: StatisticsUnit::Bytes,
207+
plain_statistics: true,
208+
}),
209+
(ProfileStatisticsName::ScanBytesFromLocal, ProfileDesc {
210+
display_name: "bytes scanned from local cache",
211+
desc: "The bytes scanned from local disk cache (compressed)",
212+
index: ProfileStatisticsName::ScanBytesFromLocal as usize,
213+
unit: StatisticsUnit::Bytes,
214+
plain_statistics: true,
215+
}),
216+
(ProfileStatisticsName::ScanBytesFromMemory, ProfileDesc {
217+
display_name: "bytes scanned from memory cache",
218+
desc: "The bytes scanned from memory cache (compressed)",
219+
index: ProfileStatisticsName::ScanBytesFromMemory as usize,
220+
unit: StatisticsUnit::Bytes,
221+
plain_statistics: true,
222+
}),
207223
(ProfileStatisticsName::RemoteSpillWriteCount, ProfileDesc {
208224
display_name: "numbers remote spilled by write",
209225
desc: "The number of remote spilled by write",

src/query/sql/src/executor/format.rs

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::collections::HashMap;
1717
use databend_common_ast::ast::FormatTreeNode;
1818
use databend_common_base::base::format_byte_size;
1919
use databend_common_base::runtime::profile::get_statistics_desc;
20+
use databend_common_base::runtime::profile::ProfileStatisticsName;
2021
use databend_common_catalog::plan::PartStatistics;
2122
use databend_common_exception::ErrorCode;
2223
use databend_common_exception::Result;
@@ -612,14 +613,46 @@ fn append_profile_info(
612613
plan_id: u32,
613614
) {
614615
if let Some(prof) = profs.get(&plan_id) {
615-
for (_, desc) in get_statistics_desc().iter() {
616-
if prof.statistics[desc.index] != 0 {
617-
children.push(FormatTreeNode::new(format!(
616+
// Calculate total scan IO bytes for percentage
617+
let total_scan_io = prof.statistics[ProfileStatisticsName::ScanBytesFromRemote as usize]
618+
+ prof.statistics[ProfileStatisticsName::ScanBytesFromLocal as usize]
619+
+ prof.statistics[ProfileStatisticsName::ScanBytesFromMemory as usize];
620+
621+
for (stat_name, desc) in get_statistics_desc().iter() {
622+
let value = prof.statistics[desc.index];
623+
if value == 0 {
624+
continue;
625+
}
626+
627+
// Add percentage for cache-related statistics
628+
let display_text = if total_scan_io > 0 {
629+
match stat_name {
630+
ProfileStatisticsName::ScanBytesFromRemote
631+
| ProfileStatisticsName::ScanBytesFromLocal
632+
| ProfileStatisticsName::ScanBytesFromMemory => {
633+
let percentage = (value as f64 / total_scan_io as f64) * 100.0;
634+
format!(
635+
"{}: {} ({:.2}%)",
636+
desc.display_name.to_lowercase(),
637+
desc.human_format(value),
638+
percentage
639+
)
640+
}
641+
_ => format!(
642+
"{}: {}",
643+
desc.display_name.to_lowercase(),
644+
desc.human_format(value)
645+
),
646+
}
647+
} else {
648+
format!(
618649
"{}: {}",
619650
desc.display_name.to_lowercase(),
620-
desc.human_format(prof.statistics[desc.index])
621-
)));
622-
}
651+
desc.human_format(value)
652+
)
653+
};
654+
655+
children.push(FormatTreeNode::new(display_text));
623656
}
624657
}
625658
}

src/query/storages/common/cache/src/providers/disk_cache_builder.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ impl CacheAccessor for DiskCacheAccessor {
118118
metrics_inc_cache_access_count(1, &self.name);
119119
let k = k.as_ref();
120120
if let Some(item) = self.lru_disk_cache.get(k) {
121-
Profile::record_usize_profile(ProfileStatisticsName::ScanCacheBytes, item.len());
121+
let size = item.len();
122+
Profile::record_usize_profile(ProfileStatisticsName::ScanBytesFromLocal, size);
122123
metrics_inc_cache_hit_count(1, &self.name);
123124
Some(item)
124125
} else {

src/query/storages/common/cache/src/providers/hybrid_cache.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,14 @@ where
152152

153153
fn get<Q: AsRef<str>>(&self, k: Q) -> Option<Arc<Self::V>> {
154154
if let Some(item) = self.memory_cache.get(k.as_ref()) {
155+
// Record memory cache hit
156+
// Note: The actual size recording happens in memory_cache.get()
155157
// try putting it bach to on-disk cache if necessary
156158
self.insert_to_disk_cache_if_necessary(k.as_ref(), item.as_ref());
157159
Some(item)
158160
} else if let Some(bytes) = self.disk_cache.get(k.as_ref()) {
161+
// Record disk cache hit
162+
// Note: The actual size recording happens in disk_cache.get()
159163
let bytes = bytes.as_ref().clone();
160164
match bytes.try_into() {
161165
Ok(v) => Some(self.memory_cache.insert(k.as_ref().to_owned(), v)),
@@ -169,7 +173,7 @@ where
169173
}
170174
}
171175
} else {
172-
// Cache Miss
176+
// Cache Miss - will need to read from remote
173177
None
174178
}
175179
}

src/query/storages/common/cache/src/providers/memory_cache.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ impl<V: Into<CacheValue<V>>> InMemoryLruCache<V> {
7979
mod impls {
8080
use std::sync::Arc;
8181

82+
use databend_common_base::runtime::profile::Profile;
83+
use databend_common_base::runtime::profile::ProfileStatisticsName;
84+
use databend_common_cache::MemSized;
8285
use databend_common_metrics::cache::metrics_inc_cache_access_count;
8386
use databend_common_metrics::cache::metrics_inc_cache_hit_count;
8487
use databend_common_metrics::cache::metrics_inc_cache_miss_bytes;
@@ -93,16 +96,24 @@ mod impls {
9396

9497
fn get<Q: AsRef<str>>(&self, k: Q) -> Option<Arc<V>> {
9598
metrics_inc_cache_access_count(1, self.name());
96-
let v = {
99+
let (v, mem_bytes) = {
97100
let mut guard = self.inner.write();
98-
guard
99-
.get(k.as_ref())
100-
.map(|cache_value: &CacheValue<V>| cache_value.get_inner())
101+
match guard.get(k.as_ref()) {
102+
Some(cache_value) => {
103+
(Some(cache_value.get_inner()), Some(cache_value.mem_bytes()))
104+
}
105+
None => (None, None),
106+
}
101107
};
108+
102109
if v.is_none() {
103110
metrics_inc_cache_miss_count(1, &self.name);
104111
} else {
105112
metrics_inc_cache_hit_count(1, &self.name);
113+
// Record bytes scanned from memory cache
114+
if let Some(size) = mem_bytes {
115+
Profile::record_usize_profile(ProfileStatisticsName::ScanBytesFromMemory, size);
116+
}
106117
}
107118
v
108119
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use ahash::AHashMap;
18+
use databend_common_exception::Result;
19+
use databend_common_expression::Scalar;
20+
use databend_common_expression::ScalarRef;
21+
use databend_common_expression::Value;
22+
use parking_lot::RwLock;
23+
24+
use super::meta::UniqueKeyDigest;
25+
26+
#[derive(Clone, Debug)]
27+
pub enum AutoIncValueSource {
28+
Key(usize),
29+
Remain(usize),
30+
}
31+
32+
#[derive(Clone, Debug)]
33+
pub struct AutoIncColumn {
34+
pub field_index: usize,
35+
pub source: AutoIncValueSource,
36+
}
37+
38+
#[derive(Clone, Debug)]
39+
pub struct AutoIncrementReplacer {
40+
columns: Vec<AutoIncColumn>,
41+
values: Arc<RwLock<AHashMap<UniqueKeyDigest, Vec<Scalar>>>>,
42+
}
43+
44+
impl AutoIncrementReplacer {
45+
pub fn try_new(columns: Vec<AutoIncColumn>) -> Option<Arc<Self>> {
46+
if columns.is_empty() {
47+
return None;
48+
}
49+
Some(Arc::new(Self {
50+
columns,
51+
values: Arc::new(RwLock::new(AHashMap::default())),
52+
}))
53+
}
54+
55+
pub fn columns(&self) -> &[AutoIncColumn] {
56+
&self.columns
57+
}
58+
59+
pub fn record(&self, hash: UniqueKeyDigest, row_values: Vec<Scalar>) -> Result<()> {
60+
if row_values.len() != self.columns.len() {
61+
return Err(databend_common_exception::ErrorCode::Internal(format!(
62+
"auto increment replacement expects {} values, got {}",
63+
self.columns.len(),
64+
row_values.len()
65+
)));
66+
}
67+
self.values.write().insert(hash, row_values);
68+
Ok(())
69+
}
70+
71+
pub fn take(&self, hash: &UniqueKeyDigest) -> Option<Vec<Scalar>> {
72+
self.values.write().remove(hash)
73+
}
74+
}
75+
76+
pub fn scalar_ref_to_owned(scalar_ref: ScalarRef) -> Scalar {
77+
match scalar_ref {
78+
ScalarRef::Null => Scalar::Null,
79+
ScalarRef::EmptyArray => Scalar::EmptyArray,
80+
ScalarRef::EmptyMap => Scalar::EmptyMap,
81+
ScalarRef::Number(n) => Scalar::Number(n),
82+
ScalarRef::Decimal(d) => Scalar::Decimal(d),
83+
ScalarRef::Boolean(b) => Scalar::Boolean(b),
84+
ScalarRef::Binary(v) => Scalar::Binary(v.to_vec()),
85+
ScalarRef::String(v) => Scalar::String(v.to_string()),
86+
ScalarRef::Timestamp(t) => Scalar::Timestamp(t),
87+
ScalarRef::TimestampTz(t) => Scalar::TimestampTz(t),
88+
ScalarRef::Date(d) => Scalar::Date(d),
89+
ScalarRef::Interval(val) => Scalar::Interval(val),
90+
ScalarRef::Array(col) => Scalar::Array(col),
91+
ScalarRef::Map(col) => Scalar::Map(col),
92+
ScalarRef::Bitmap(v) => Scalar::Bitmap(v.to_vec()),
93+
ScalarRef::Tuple(items) => Scalar::Tuple(items.into_iter().map(scalar_ref_to_owned).collect()),
94+
ScalarRef::Variant(v) => Scalar::Variant(v.to_vec()),
95+
ScalarRef::Geometry(v) => Scalar::Geometry(v.to_vec()),
96+
ScalarRef::Geography(v) => Scalar::Geography(v.into()),
97+
ScalarRef::Vector(v) => Scalar::Vector(v.to_owned()),
98+
ScalarRef::Opaque(v) => Scalar::Opaque(v.to_owned()),
99+
}
100+
}
101+
102+
pub fn value_row_scalar(value: &Value<databend_common_expression::types::AnyType>, row: usize) -> Result<Scalar> {
103+
let scalar_ref = value.row_scalar(row)?;
104+
Ok(scalar_ref_to_owned(scalar_ref))
105+
}

0 commit comments

Comments
 (0)