Skip to content

Commit 6b4cf08

Browse files
authored
chore(query): enable to cache the previous python import directory for python udf (#19003)
* update * update * update
1 parent 7dab2f6 commit 6b4cf08

File tree

1 file changed

+100
-31
lines changed

1 file changed

+100
-31
lines changed

โ€Žsrc/query/service/src/pipelines/processors/transforms/transform_udf_script.rsโ€Ž

Lines changed: 100 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ use databend_common_sql::plans::UDFLanguage;
4343
use databend_common_sql::plans::UDFScriptCode;
4444
use databend_common_sql::plans::UDFType;
4545
use databend_common_storage::init_stage_operator;
46-
use tempfile::TempDir;
4746

47+
use self::venv::TempDir;
4848
use super::runtime_pool::Pool;
4949
use super::runtime_pool::RuntimeBuilder;
5050
use crate::physical_plans::UdfFunctionDesc;
@@ -60,7 +60,7 @@ static PY_VERSION: LazyLock<String> =
6060
LazyLock::new(|| venv::detect_python_version().unwrap_or("3.12".to_string()));
6161

6262
impl ScriptRuntime {
63-
pub fn try_create(func: &UdfFunctionDesc, _temp_dir: Option<Arc<TempDir>>) -> Result<Self> {
63+
pub fn try_create(func: &UdfFunctionDesc, _temp_dir: Option<TempDir>) -> Result<Self> {
6464
let UDFType::Script(box UDFScriptCode { language, code, .. }) = &func.udf_type else {
6565
unreachable!()
6666
};
@@ -103,7 +103,7 @@ if '{dir}' not in sys.path:
103103
dir = import_dir,
104104
));
105105

106-
let stage_paths = Self::collect_stage_sys_paths(func, temp_dir.as_ref());
106+
let stage_paths = Self::collect_stage_sys_paths(func, temp_dir);
107107
if !stage_paths.is_empty() {
108108
script.push_str("for _databend_zip in (");
109109
for (idx, path) in stage_paths.iter().enumerate() {
@@ -122,7 +122,7 @@ if '{dir}' not in sys.path:
122122
};
123123

124124
if let Some(temp_dir) = &_temp_dir {
125-
let stage_paths = Self::collect_stage_sys_paths(func, temp_dir.as_ref());
125+
let stage_paths = Self::collect_stage_sys_paths(func, temp_dir);
126126
if !stage_paths.is_empty() {
127127
log::info!(
128128
"Python UDF {:?} added stage artifacts to sys.path: {:?}",
@@ -589,7 +589,7 @@ impl Transform for TransformUdfScript {
589589
}
590590
}
591591

592-
type RuntimeTimeRes = BTreeMap<String, (Arc<ScriptRuntime>, Option<Arc<TempDir>>)>;
592+
type RuntimeTimeRes = BTreeMap<String, (Arc<ScriptRuntime>, Option<TempDir>)>;
593593

594594
impl TransformUdfScript {
595595
pub fn init_runtime(funcs: &[UdfFunctionDesc]) -> Result<RuntimeTimeRes> {
@@ -621,7 +621,7 @@ impl TransformUdfScript {
621621
if let Some(entry) = w.get(&key) {
622622
Some(entry.materialize().map_err(ErrorCode::from_string)?)
623623
} else {
624-
let temp_dir = Arc::new(venv::create_venv(PY_VERSION.as_str())?);
624+
let temp_dir = venv::create_venv(PY_VERSION.as_str())?;
625625
venv::install_deps(temp_dir.path(), &dependencies)?;
626626

627627
if !imports_stage_info.is_empty() {
@@ -878,7 +878,6 @@ mod venv {
878878
use databend_common_cache::MemSized;
879879
use parking_lot::Mutex;
880880
use parking_lot::RwLock;
881-
use tempfile::TempDir;
882881
use uuid::Uuid;
883882
use walkdir::WalkDir;
884883
use zip::write::FileOptions;
@@ -899,6 +898,86 @@ mod venv {
899898
base
900899
});
901900

901+
static PY_VENV_WORK_DIR: LazyLock<PathBuf> = LazyLock::new(|| {
902+
let base = std::env::temp_dir().join("databend").join("python_udf_env");
903+
if let Err(e) = fs::create_dir_all(&base) {
904+
panic!("Failed to create python udf work dir {:?}: {}", base, e);
905+
}
906+
base
907+
});
908+
909+
#[derive(Clone)]
910+
pub struct TempDir {
911+
inner: Arc<TempDirInner>,
912+
}
913+
914+
struct TempDirInner {
915+
path: PathBuf,
916+
}
917+
918+
#[derive(Default)]
919+
struct WeakTempDir {
920+
inner: Weak<TempDirInner>,
921+
}
922+
923+
impl TempDir {
924+
fn new_impl(path: PathBuf) -> Result<Self, String> {
925+
if path.exists() {
926+
fs::remove_dir_all(&path).map_err(|e| {
927+
format!("Failed to clean python udf temp dir {:?}: {}", path, e)
928+
})?;
929+
}
930+
fs::create_dir_all(&path)
931+
.map_err(|e| format!("Failed to create python udf temp dir {:?}: {}", path, e))?;
932+
Ok(Self {
933+
inner: Arc::new(TempDirInner { path }),
934+
})
935+
}
936+
937+
pub fn new() -> Result<Self, String> {
938+
let path = PY_VENV_WORK_DIR.join(Uuid::now_v7().to_string());
939+
Self::new_impl(path)
940+
}
941+
942+
pub fn new_with_path(path: PathBuf) -> Result<Self, String> {
943+
Self::new_impl(path)
944+
}
945+
946+
pub fn path(&self) -> &Path {
947+
&self.inner.path
948+
}
949+
950+
fn downgrade(&self) -> WeakTempDir {
951+
WeakTempDir {
952+
inner: Arc::downgrade(&self.inner),
953+
}
954+
}
955+
}
956+
957+
impl WeakTempDir {
958+
fn upgrade(&self) -> Option<TempDir> {
959+
self.inner.upgrade().map(|inner| TempDir { inner })
960+
}
961+
962+
fn replace(&mut self, temp_dir: &TempDir) {
963+
self.inner = Arc::downgrade(&temp_dir.inner);
964+
}
965+
}
966+
967+
impl Drop for TempDirInner {
968+
fn drop(&mut self) {
969+
if let Err(e) = fs::remove_dir_all(&self.path) {
970+
if !matches!(e.kind(), io::ErrorKind::NotFound) {
971+
log::warn!(
972+
"Failed to remove python udf temp dir {:?}: {}",
973+
self.path,
974+
e
975+
);
976+
}
977+
}
978+
}
979+
}
980+
902981
pub fn install_deps(temp_dir_path: &Path, deps: &[String]) -> Result<(), String> {
903982
if deps.is_empty() {
904983
return Ok(());
@@ -963,32 +1042,19 @@ mod venv {
9631042
Ok(archive_path)
9641043
}
9651044

966-
pub fn restore_env(archive_path: &Path) -> Result<TempDir, String> {
967-
let temp_dir =
968-
tempfile::tempdir().map_err(|e| format!("Failed to create temp dir: {}", e))?;
1045+
pub fn restore_env_into(archive_path: &Path, temp_dir_path: &Path) -> Result<(), String> {
9691046
let reader = File::open(archive_path)
9701047
.map_err(|e| format!("Failed to read python deps archive: {}", e))?;
9711048
let mut archive =
9721049
ZipArchive::new(reader).map_err(|e| format!("Failed to open archive: {}", e))?;
9731050
archive
974-
.extract(temp_dir.path())
1051+
.extract(temp_dir_path)
9751052
.map_err(|e| format!("Failed to extract python deps: {}", e))?;
976-
Ok(temp_dir)
1053+
Ok(())
9771054
}
9781055

9791056
pub fn create_venv(_python_version: &str) -> Result<TempDir, String> {
980-
let temp_dir =
981-
tempfile::tempdir().map_err(|e| format!("Failed to create temp dir: {}", e))?;
982-
983-
// let env_path = temp_dir.path().join(".venv");
984-
// Command::new("python")
985-
// .args(["-m", "venv", env_path.to_str().unwrap()])
986-
// .stdout(std::process::Stdio::null())
987-
// .stderr(std::process::Stdio::null())
988-
// .status()
989-
// .map_err(|e| format!("Failed to create venv: {}", e))?;
990-
991-
Ok(temp_dir)
1057+
TempDir::new()
9921058
}
9931059

9941060
pub fn detect_python_version() -> Result<String, String> {
@@ -1017,7 +1083,8 @@ mod venv {
10171083
// Add this after the PY_VERSION LazyLock declaration
10181084
// A simple LRU cache for Python virtual environments
10191085
pub(crate) struct PyVenvCacheEntry {
1020-
temp_dir: Mutex<Weak<TempDir>>,
1086+
temp_dir: Mutex<WeakTempDir>,
1087+
temp_dir_path: PathBuf,
10211088
archive_path: PathBuf,
10221089
}
10231090

@@ -1074,25 +1141,27 @@ mod venv {
10741141

10751142
impl MemSized for PyVenvCacheEntry {
10761143
fn mem_bytes(&self) -> usize {
1077-
std::mem::size_of::<Mutex<Weak<TempDir>>>() + std::mem::size_of::<PathBuf>()
1144+
std::mem::size_of::<Mutex<WeakTempDir>>() + 2 * std::mem::size_of::<PathBuf>()
10781145
}
10791146
}
10801147

10811148
impl PyVenvCacheEntry {
1082-
pub fn new(temp_dir: Arc<TempDir>, archive_path: PathBuf) -> Self {
1149+
pub fn new(temp_dir: TempDir, archive_path: PathBuf) -> Self {
10831150
Self {
1084-
temp_dir: Mutex::new(Arc::downgrade(&temp_dir)),
1151+
temp_dir: Mutex::new(temp_dir.downgrade()),
1152+
temp_dir_path: temp_dir.path().to_path_buf(),
10851153
archive_path,
10861154
}
10871155
}
10881156

1089-
pub fn materialize(&self) -> Result<Arc<TempDir>, String> {
1157+
pub fn materialize(&self) -> Result<TempDir, String> {
10901158
if let Some(existing) = self.temp_dir.lock().upgrade() {
10911159
return Ok(existing);
10921160
}
10931161

1094-
let temp_dir = Arc::new(restore_env(&self.archive_path)?);
1095-
*self.temp_dir.lock() = Arc::downgrade(&temp_dir);
1162+
let temp_dir = TempDir::new_with_path(self.temp_dir_path.clone())?;
1163+
restore_env_into(&self.archive_path, temp_dir.path())?;
1164+
self.temp_dir.lock().replace(&temp_dir);
10961165
Ok(temp_dir)
10971166
}
10981167
}

0 commit comments

Comments
ย (0)