From aa31a5e46efd2960d58eb92027f63826763d6933 Mon Sep 17 00:00:00 2001 From: Andrii Sultanov Date: Tue, 25 Nov 2025 17:54:09 +0000 Subject: [PATCH 1/9] vhd-tool: Add read_headers command for determining allocated blocks Currently, vhd-tool provides several "hybrid" modes where it exports into vhd from raw, using the information from the VHD bitmaps to determine which blocks and sectors contain data (to avoid reading zero blocks). Other tools are also handling VHD-backed VDIs (we are exporting them as part of XVA export, and now they can also be exported to QCOW), and currently they have to read the whole raw disk. Instead provide a read_headers command which provides data on allocated clusters for other tools to use, allowing them to speed up handling sparse VDIs. It uses a new blocks_json function in Vhd_format. Signed-off-by: Andrii Sultanov --- ocaml/libs/vhd/vhd_format/dune | 3 ++- ocaml/libs/vhd/vhd_format/f.ml | 33 +++++++++++++++++++++++++++++++++ ocaml/libs/vhd/vhd_format/f.mli | 2 ++ ocaml/vhd-tool/cli/main.ml | 22 +++++++++++++++++++++- ocaml/vhd-tool/src/impl.ml | 8 ++++++++ ocaml/vhd-tool/src/impl.mli | 3 +++ 6 files changed, 69 insertions(+), 2 deletions(-) diff --git a/ocaml/libs/vhd/vhd_format/dune b/ocaml/libs/vhd/vhd_format/dune index bafac365189..eea7d1960a0 100644 --- a/ocaml/libs/vhd/vhd_format/dune +++ b/ocaml/libs/vhd/vhd_format/dune @@ -2,5 +2,6 @@ (name vhd_format) (public_name vhd-format) (flags :standard -w -32-34-37) - (libraries stdlib-shims (re_export bigarray-compat) cstruct io-page rresult unix uuidm) + (libraries stdlib-shims (re_export bigarray-compat) cstruct io-page rresult + unix uuidm yojson) (preprocess (pps ppx_cstruct))) diff --git a/ocaml/libs/vhd/vhd_format/f.ml b/ocaml/libs/vhd/vhd_format/f.ml index 79128d0035f..d12329bd760 100644 --- a/ocaml/libs/vhd/vhd_format/f.ml +++ b/ocaml/libs/vhd/vhd_format/f.ml @@ -2883,6 +2883,37 @@ functor let raw ?from (vhd : fd Vhd.t) = raw_common ?from vhd + let vhd_blocks_to_json (t : fd Vhd.t) = + let block_size_sectors_shift = + t.Vhd.header.Header.block_size_sectors_shift + in + let max_table_entries = Vhd.used_max_table_entries t in + + let include_block = include_block None t in + + let blocks = + Seq.init max_table_entries Fun.id + |> Seq.filter_map (fun i -> + if include_block i then + Some (`Int i) + else + None + ) + |> List.of_seq + in + let json = + `Assoc + [ + ( "virtual_size" + , `Int (Int64.to_int t.Vhd.footer.Footer.current_size) + ) + ; ("cluster_bits", `Int (block_size_sectors_shift + sector_shift)) + ; ("data_clusters", `List blocks) + ] + in + let json_string = Yojson.to_string json in + print_string json_string ; return () + let vhd_common ?from ?raw ?(emit_batmap = false) (t : fd Vhd.t) = let block_size_sectors_shift = t.Vhd.header.Header.block_size_sectors_shift @@ -3119,6 +3150,8 @@ functor let vhd ?from (raw : 'a) (vhd : fd Vhd.t) = Vhd_input.vhd_common ?from ~raw vhd + + let blocks_json = Vhd_input.vhd_blocks_to_json end (* Create a VHD stream from data on t, using `include_block` guide us which blocks have data *) diff --git a/ocaml/libs/vhd/vhd_format/f.mli b/ocaml/libs/vhd/vhd_format/f.mli index ae300a9c95e..40de5f9d902 100644 --- a/ocaml/libs/vhd/vhd_format/f.mli +++ b/ocaml/libs/vhd/vhd_format/f.mli @@ -474,6 +474,8 @@ module From_file : functor (F : S.FILE) -> sig copies from the virtual disk [raw]. If [from] is provided then the stream will contain only the virtual updates required to transform [from] into [t] *) + + val blocks_json : fd Vhd.t -> unit t end module Raw_input : sig diff --git a/ocaml/vhd-tool/cli/main.ml b/ocaml/vhd-tool/cli/main.ml index c163763c9df..fb950be35e0 100644 --- a/ocaml/vhd-tool/cli/main.ml +++ b/ocaml/vhd-tool/cli/main.ml @@ -385,9 +385,29 @@ let stream_cmd = , Cmd.info "stream" ~sdocs:_common_options ~doc ~man ) +let read_headers_cmd = + let doc = + "Parse VHD headers and output allocated blocks information in JSON format \ + like: {'virtual_size': X, 'cluster_bits: X, 'data_clusters: [1,2,3]}" + in + let source = + let doc = Printf.sprintf "Path to the VHD file" in + Arg.(required & pos 0 (some file) None & info [] ~doc) + in + ( Term.(ret (const Impl.read_headers $ common_options_t $ source)) + , Cmd.info "read_headers" ~sdocs:_common_options ~doc + ) + let cmds = [ - info_cmd; contents_cmd; get_cmd; create_cmd; check_cmd; serve_cmd; stream_cmd + info_cmd + ; contents_cmd + ; get_cmd + ; create_cmd + ; check_cmd + ; serve_cmd + ; stream_cmd + ; read_headers_cmd ] |> List.map (fun (t, i) -> Cmd.v i t) diff --git a/ocaml/vhd-tool/src/impl.ml b/ocaml/vhd-tool/src/impl.ml index d067846f565..9dda493a0a2 100644 --- a/ocaml/vhd-tool/src/impl.ml +++ b/ocaml/vhd-tool/src/impl.ml @@ -1164,6 +1164,14 @@ let stream_t common args ?(progress = no_progress_bar) () = args.StreamCommon.tar_filename_prefix args.StreamCommon.good_ciphersuites args.StreamCommon.verify_cert +let read_headers common source = + let path = [Filename.dirname source] in + let thread = + retry common 3 (fun () -> Vhd_IO.openchain ~path source false) >>= fun t -> + Vhd_IO.close t >>= fun () -> Hybrid_input.blocks_json t + in + Lwt_main.run thread ; `Ok () + let stream common args = try Vhd_format_lwt.File.use_unbuffered := common.Common.unbuffered ; diff --git a/ocaml/vhd-tool/src/impl.mli b/ocaml/vhd-tool/src/impl.mli index 2ffa08da6ce..13fe7ba6853 100644 --- a/ocaml/vhd-tool/src/impl.mli +++ b/ocaml/vhd-tool/src/impl.mli @@ -35,6 +35,9 @@ val check : val stream : Common.t -> StreamCommon.t -> [> `Error of bool * string | `Ok of unit] +val read_headers : + Common.t -> string -> [> `Error of bool * string | `Ok of unit] + val serve : Common.t -> string From 37f8b66ef2272d32d1b04106c27f116d3a20da1e Mon Sep 17 00:00:00 2001 From: Andrii Sultanov Date: Fri, 21 Nov 2025 09:58:52 +0000 Subject: [PATCH 2/9] stream_vdi: Factor out send_one into a top-level function The body has less indentation this way Signed-off-by: Andrii Sultanov --- ocaml/xapi/stream_vdi.ml | 264 ++++++++++++++++++++------------------- 1 file changed, 133 insertions(+), 131 deletions(-) diff --git a/ocaml/xapi/stream_vdi.ml b/ocaml/xapi/stream_vdi.ml index 84765aeaf7b..f18a7c17983 100644 --- a/ocaml/xapi/stream_vdi.ml +++ b/ocaml/xapi/stream_vdi.ml @@ -251,143 +251,145 @@ let get_chunk_numbers_in_increasing_order descriptor_list offset = let chunks = process [] offset descriptor_list in List.rev chunks +let send_one ofd (__context : Context.t) rpc session_id progress refresh_session + (prefix, vdi_ref, _size) = + let size = Db.VDI.get_virtual_size ~__context ~self:vdi_ref in + let reusable_buffer = Bytes.make (Int64.to_int chunk_size) '\000' in + with_open_vdi __context rpc session_id vdi_ref `RO [Unix.O_RDONLY] 0o644 + (fun ifd dom0_path -> + match get_nbd_device dom0_path with + | None -> + (* Remember when we last wrote something so that we can work around firewalls which close 'idle' connections *) + let last_transmission_time = ref 0. in + (* NB. It used to be that chunks could be larger than a native int *) + (* could handle, but this is no longer the case! Ensure all chunks *) + (* are strictly less than 2^30 bytes *) + let rec stream_from (chunk_no : int) (offset : int64) = + refresh_session () ; + let remaining = Int64.sub size offset in + if remaining > 0L then ( + let this_chunk = min remaining chunk_size in + let last_chunk = this_chunk = remaining in + let this_chunk = Int64.to_int this_chunk in + let filename = Printf.sprintf "%s/%08d" prefix chunk_no in + let now = Unix.gettimeofday () in + let time_since_transmission = now -. !last_transmission_time in + (* We always include the first and last blocks *) + let first_or_last = chunk_no = 0 || last_chunk in + if time_since_transmission > 5. && not first_or_last then ( + last_transmission_time := now ; + write_block ~__context filename Bytes.empty ofd 0 ; + (* no progress has been made *) + stream_from (chunk_no + 1) offset + ) else + let buffer = + if Int64.of_int this_chunk = chunk_size then + reusable_buffer + else + Bytes.make this_chunk '\000' + in + Unixext.really_read ifd buffer 0 this_chunk ; + if + first_or_last + || not (Zerocheck.is_all_zeros (Bytes.unsafe_to_string buffer)) + then ( + last_transmission_time := now ; + write_block ~__context filename buffer ofd this_chunk + ) ; + made_progress __context progress (Int64.of_int this_chunk) ; + stream_from (chunk_no + 1) (Int64.add offset chunk_size) + ) + in + stream_from 0 0L + | Some (path, exportname) -> + let last_transmission_time = ref 0L in + let actually_write_chunk (this_chunk_no : int) (this_chunk_size : int) + = + let buffer = + if this_chunk_size = Int64.to_int chunk_size then + reusable_buffer + else + Bytes.make this_chunk_size '\000' + in + let filename = Printf.sprintf "%s/%08d" prefix this_chunk_no in + Unix.LargeFile.lseek ifd + (Int64.mul (Int64.of_int this_chunk_no) chunk_size) + Unix.SEEK_SET + |> ignore ; + Unixext.really_read ifd buffer 0 this_chunk_size ; + last_transmission_time := Mtime_clock.now_ns () ; + write_block ~__context filename buffer ofd this_chunk_size ; + made_progress __context progress (Int64.of_int this_chunk_size) + in + let rec stream_from_offset (offset : int64) = + let remaining = Int64.sub size offset in + if remaining > 0L then ( + let this_chunk_size = + min (Int64.to_int chunk_size) (Int64.to_int remaining) + in + let this_chunk_no = Int64.div offset chunk_size in + let now = Mtime_clock.now_ns () in + let time_since_transmission = + Int64.sub now !last_transmission_time + in + if + offset = 0L + || remaining <= chunk_size + || time_since_transmission > 5000000000L + then ( + actually_write_chunk + (Int64.to_int this_chunk_no) + this_chunk_size ; + stream_from_offset + (Int64.add offset (Int64.of_int this_chunk_size)) + ) else + let remaining = + Int64.mul + (Int64.div (Int64.sub remaining 1L) chunk_size) + chunk_size + in + (* Get sparseness for next 10GB or until the end rounded by last chunk, whichever is smaller *) + let sparseness_size = min max_sparseness_size remaining in + let output, _ = + Forkhelpers.execute_command_get_output + Xapi_globs.get_nbd_extents + [ + "--path" + ; path + ; "--exportname" + ; exportname + ; "--offset" + ; Int64.to_string offset + ; "--length" + ; Int64.to_string sparseness_size + ] + in + let extents = extent_list_of_rpc (Jsonrpc.of_string output) in + let chunks = + get_chunk_numbers_in_increasing_order extents offset + in + List.iter + (fun chunk -> + actually_write_chunk (Int64.to_int chunk) + (Int64.to_int chunk_size) + ) + chunks ; + stream_from_offset (Int64.add offset sparseness_size) + ) + in + stream_from_offset 0L + ) ; + debug "Finished streaming VDI" + (** Stream a set of VDIs split into chunks in a tar format in a defined order. Return an association list mapping tar filename -> string (containing the SHA1 checksums) *) let send_all refresh_session ofd ~__context rpc session_id (prefix_vdis : vdi list) = TaskHelper.set_cancellable ~__context ; let progress = new_progress_record __context prefix_vdis in - let send_one ofd (__context : Context.t) (prefix, vdi_ref, _size) = - let size = Db.VDI.get_virtual_size ~__context ~self:vdi_ref in - let reusable_buffer = Bytes.make (Int64.to_int chunk_size) '\000' in - with_open_vdi __context rpc session_id vdi_ref `RO [Unix.O_RDONLY] 0o644 - (fun ifd dom0_path -> - match get_nbd_device dom0_path with - | None -> - (* Remember when we last wrote something so that we can work around firewalls which close 'idle' connections *) - let last_transmission_time = ref 0. in - (* NB. It used to be that chunks could be larger than a native int *) - (* could handle, but this is no longer the case! Ensure all chunks *) - (* are strictly less than 2^30 bytes *) - let rec stream_from (chunk_no : int) (offset : int64) = - refresh_session () ; - let remaining = Int64.sub size offset in - if remaining > 0L then ( - let this_chunk = min remaining chunk_size in - let last_chunk = this_chunk = remaining in - let this_chunk = Int64.to_int this_chunk in - let filename = Printf.sprintf "%s/%08d" prefix chunk_no in - let now = Unix.gettimeofday () in - let time_since_transmission = now -. !last_transmission_time in - (* We always include the first and last blocks *) - let first_or_last = chunk_no = 0 || last_chunk in - if time_since_transmission > 5. && not first_or_last then ( - last_transmission_time := now ; - write_block ~__context filename Bytes.empty ofd 0 ; - (* no progress has been made *) - stream_from (chunk_no + 1) offset - ) else - let buffer = - if Int64.of_int this_chunk = chunk_size then - reusable_buffer - else - Bytes.make this_chunk '\000' - in - Unixext.really_read ifd buffer 0 this_chunk ; - if - first_or_last - || not - (Zerocheck.is_all_zeros (Bytes.unsafe_to_string buffer)) - then ( - last_transmission_time := now ; - write_block ~__context filename buffer ofd this_chunk - ) ; - made_progress __context progress (Int64.of_int this_chunk) ; - stream_from (chunk_no + 1) (Int64.add offset chunk_size) - ) - in - stream_from 0 0L - | Some (path, exportname) -> - let last_transmission_time = ref 0L in - let actually_write_chunk (this_chunk_no : int) - (this_chunk_size : int) = - let buffer = - if this_chunk_size = Int64.to_int chunk_size then - reusable_buffer - else - Bytes.make this_chunk_size '\000' - in - let filename = Printf.sprintf "%s/%08d" prefix this_chunk_no in - Unix.LargeFile.lseek ifd - (Int64.mul (Int64.of_int this_chunk_no) chunk_size) - Unix.SEEK_SET - |> ignore ; - Unixext.really_read ifd buffer 0 this_chunk_size ; - last_transmission_time := Mtime_clock.now_ns () ; - write_block ~__context filename buffer ofd this_chunk_size ; - made_progress __context progress (Int64.of_int this_chunk_size) - in - let rec stream_from_offset (offset : int64) = - let remaining = Int64.sub size offset in - if remaining > 0L then ( - let this_chunk_size = - min (Int64.to_int chunk_size) (Int64.to_int remaining) - in - let this_chunk_no = Int64.div offset chunk_size in - let now = Mtime_clock.now_ns () in - let time_since_transmission = - Int64.sub now !last_transmission_time - in - if - offset = 0L - || remaining <= chunk_size - || time_since_transmission > 5000000000L - then ( - actually_write_chunk - (Int64.to_int this_chunk_no) - this_chunk_size ; - stream_from_offset - (Int64.add offset (Int64.of_int this_chunk_size)) - ) else - let remaining = - Int64.mul - (Int64.div (Int64.sub remaining 1L) chunk_size) - chunk_size - in - (* Get sparseness for next 10GB or until the end rounded by last chunk, whichever is smaller *) - let sparseness_size = min max_sparseness_size remaining in - let output, _ = - Forkhelpers.execute_command_get_output - Xapi_globs.get_nbd_extents - [ - "--path" - ; path - ; "--exportname" - ; exportname - ; "--offset" - ; Int64.to_string offset - ; "--length" - ; Int64.to_string sparseness_size - ] - in - let extents = extent_list_of_rpc (Jsonrpc.of_string output) in - let chunks = - get_chunk_numbers_in_increasing_order extents offset - in - List.iter - (fun chunk -> - actually_write_chunk (Int64.to_int chunk) - (Int64.to_int chunk_size) - ) - chunks ; - stream_from_offset (Int64.add offset sparseness_size) - ) - in - stream_from_offset 0L - ) ; - debug "Finished streaming VDI" - in - for_each_vdi __context (send_one ofd __context) prefix_vdis + for_each_vdi __context + (send_one ofd __context rpc session_id progress refresh_session) + prefix_vdis exception Invalid_checksum of string From 3d36a04ef1ec54db957920daf4529da6f3b62c0c Mon Sep 17 00:00:00 2001 From: Andrii Sultanov Date: Mon, 24 Nov 2025 09:01:11 +0000 Subject: [PATCH 3/9] xapi_vdi_helpers: Move backing_file_of_device from vhd_tool_wrapper This allows using it in stream_vdi and qcow_tool_wrapper without introducing a dependency cycle. Signed-off-by: Andrii Sultanov --- ocaml/xapi/dune | 1 + ocaml/xapi/qcow_tool_wrapper.ml | 2 +- ocaml/xapi/stream_vdi.ml | 59 +-------------- ocaml/xapi/vhd_tool_wrapper.ml | 76 +------------------ ocaml/xapi/xapi_vdi_helpers.ml | 126 ++++++++++++++++++++++++++++++++ 5 files changed, 133 insertions(+), 131 deletions(-) diff --git a/ocaml/xapi/dune b/ocaml/xapi/dune index 8095a5c4bfc..0a9b1099c0f 100644 --- a/ocaml/xapi/dune +++ b/ocaml/xapi/dune @@ -259,6 +259,7 @@ Storage_mux Storage_smapiv1_wrapper Stream_vdi + Xapi_vdi_helpers System_domains Xapi_psr Xapi_services diff --git a/ocaml/xapi/qcow_tool_wrapper.ml b/ocaml/xapi/qcow_tool_wrapper.ml index e3cd13d469b..3957bbe4864 100644 --- a/ocaml/xapi/qcow_tool_wrapper.ml +++ b/ocaml/xapi/qcow_tool_wrapper.ml @@ -79,7 +79,7 @@ let parse_header qcow_path = let send ?relative_to (progress_cb : int -> unit) (unix_fd : Unix.file_descr) (path : string) (_size : Int64.t) = let qcow_of_device = - Vhd_tool_wrapper.backing_file_of_device ~driver:"qcow2" + Xapi_vdi_helpers.backing_file_of_device ~driver:"qcow2" in let qcow_path = qcow_of_device path in diff --git a/ocaml/xapi/stream_vdi.ml b/ocaml/xapi/stream_vdi.ml index f18a7c17983..3ce924a6417 100644 --- a/ocaml/xapi/stream_vdi.ml +++ b/ocaml/xapi/stream_vdi.ml @@ -131,63 +131,6 @@ let write_block ~__context filename buffer ofd len = else raise e -let get_device_numbers path = - let rdev = (Unix.LargeFile.stat path).Unix.LargeFile.st_rdev in - let major = rdev / 256 and minor = rdev mod 256 in - (major, minor) - -let is_nbd_device path = - let nbd_device_num = 43 in - let major, _ = get_device_numbers path in - major = nbd_device_num - -type nbd_connect_info = {path: string; exportname: string} [@@deriving rpc] - -let get_nbd_device path = - let nbd_device_prefix = "/dev/nbd" in - if - Astring.String.is_prefix ~affix:nbd_device_prefix path && is_nbd_device path - then - let nbd_number = - Astring.String.with_range ~first:(String.length nbd_device_prefix) path - in - let {path; exportname} = - (* persistent_nbd_info_dir is written from nbd_client_manager.py as part of VBD plug*) - let persistent_nbd_info_dir = "/var/run/nonpersistent/nbd" in - let filename = persistent_nbd_info_dir ^ "/" ^ nbd_number in - Xapi_stdext_unix.Unixext.string_of_file filename - |> Jsonrpc.of_string - |> nbd_connect_info_of_rpc - in - Some (path, exportname) - else - None - -(* Copied from vhd-tool/src/image.ml. - * Just keep the situation of xapi doesn't depend on vhd-tool OCaml module. - *) -let image_behind_nbd_device = function - | Some (path, _exportname) as image -> - (* The nbd server path exposed by tapdisk can lead us to the actual image - file below. Following the symlink gives a path like - `/run/blktap-control/nbd.`, - containing the tapdisk pid and minor number. Using this information, - we can get the file path from tap-ctl. - *) - let default _ _ = image in - let filename = Unix.realpath path |> Filename.basename in - Scanf.ksscanf filename default "nbd%d.%d" (fun pid minor -> - match Tapctl.find (Tapctl.create ()) ~pid ~minor with - | _, _, Some ("vhd", vhd) -> - Some ("vhd", vhd) - | _, _, Some ("aio", vhd) -> - Some ("raw", vhd) - | _, _, _ | (exception _) -> - None - ) - | _ -> - None - type extent = {flags: int32; length: int64} [@@deriving rpc] type extent_list = extent list [@@deriving rpc] @@ -257,7 +200,7 @@ let send_one ofd (__context : Context.t) rpc session_id progress refresh_session let reusable_buffer = Bytes.make (Int64.to_int chunk_size) '\000' in with_open_vdi __context rpc session_id vdi_ref `RO [Unix.O_RDONLY] 0o644 (fun ifd dom0_path -> - match get_nbd_device dom0_path with + match Xapi_vdi_helpers.get_nbd_device dom0_path with | None -> (* Remember when we last wrote something so that we can work around firewalls which close 'idle' connections *) let last_transmission_time = ref 0. in diff --git a/ocaml/xapi/vhd_tool_wrapper.ml b/ocaml/xapi/vhd_tool_wrapper.ml index f3f791fe251..c3460381cb1 100644 --- a/ocaml/xapi/vhd_tool_wrapper.ml +++ b/ocaml/xapi/vhd_tool_wrapper.ml @@ -18,7 +18,6 @@ module D = Debug.Make (struct let name = "vhd_tool_wrapper" end) open D -open Xapi_stdext_std.Xstringext (* .vhds on XenServer are sometimes found via /dev/mapper *) let vhd_search_path = "/dev/mapper:." @@ -113,81 +112,14 @@ let receive progress_cb format protocol (s : Unix.file_descr) in run_vhd_tool progress_cb args s s' path -(** [find_backend_device path] returns [Some path'] where [path'] is the backend path in - the driver domain corresponding to the frontend device [path] in this domain. *) -let find_backend_device path = - try - let open Ezxenstore_core.Xenstore in - (* If we're looking at a xen frontend device, see if the backend - is in the same domain. If so check if it looks like a .vhd *) - let rdev = (Unix.stat path).Unix.st_rdev in - let major = rdev / 256 and minor = rdev mod 256 in - let link = - Unix.readlink (Printf.sprintf "/sys/dev/block/%d:%d/device" major minor) - in - match List.rev (String.split '/' link) with - | id :: "xen" :: "devices" :: _ - when Astring.String.is_prefix ~affix:"vbd-" id -> - let id = int_of_string (String.sub id 4 (String.length id - 4)) in - with_xs (fun xs -> - let self = xs.Xs.read "domid" in - let backend = - xs.Xs.read (Printf.sprintf "device/vbd/%d/backend" id) - in - let params = xs.Xs.read (Printf.sprintf "%s/params" backend) in - match String.split '/' backend with - | "local" :: "domain" :: bedomid :: _ -> - if not (self = bedomid) then - Helpers.internal_error - "find_backend_device: Got domid %s but expected %s" bedomid - self ; - Some params - | _ -> - raise Not_found - ) - | _ -> - raise Not_found - with _ -> None - -(** [backing_file_of_device path] returns (Some backing_file) where 'backing_file' - is the leaf backing a particular device [path] (with a driver of type - [driver] or None. [path] may either be a blktap2 device *or* a blkfront - device backed by a blktap2 device. If the latter then the script must be - run in the same domain as blkback. *) -let backing_file_of_device path ~driver = - let tapdisk_of_path path = - try - match Tapctl.of_device (Tapctl.create ()) path with - | _, _, Some (typ, backing_file) when typ = driver -> - Some backing_file - | _, _, _ -> - raise Not_found - with - | Tapctl.Not_blktap -> ( - debug "Device %s is not controlled by blktap" path ; - (* Check if it is a [driver] behind a NBD device *) - Stream_vdi.(get_nbd_device path |> image_behind_nbd_device) |> function - | Some (typ, backing_file) when typ = driver -> - debug "%s is a %s behind NBD device %s" backing_file driver path ; - Some backing_file - | _ -> - None - ) - | Tapctl.Not_a_device -> - debug "%s is not a device" path ; - None - | _ -> - debug "Device %s has an unknown driver" path ; - None - in - find_backend_device path |> Option.value ~default:path |> tapdisk_of_path - let send progress_cb ?relative_to (protocol : string) (dest_format : string) (s : Unix.file_descr) (path : string) (size : Int64.t) (prefix : string) = - let vhd_of_device = backing_file_of_device ~driver:"vhd" in + let vhd_of_device = Xapi_vdi_helpers.backing_file_of_device ~driver:"vhd" in let s' = Uuidx.(to_string (make ())) in let source_format, source = - match (Stream_vdi.get_nbd_device path, vhd_of_device path, relative_to) with + match + (Xapi_vdi_helpers.get_nbd_device path, vhd_of_device path, relative_to) + with | Some (nbd_server, exportname), _, None -> ( "nbdhybrid" , Printf.sprintf "%s:%s:%s:%Ld" path nbd_server exportname size diff --git a/ocaml/xapi/xapi_vdi_helpers.ml b/ocaml/xapi/xapi_vdi_helpers.ml index 84db627c719..a6d6c8905b8 100644 --- a/ocaml/xapi/xapi_vdi_helpers.ml +++ b/ocaml/xapi/xapi_vdi_helpers.ml @@ -300,3 +300,129 @@ let read_raw ~__context ~vdi = Some (VDI_CStruct.read cstruct) ) ) + +type nbd_connect_info = {path: string; exportname: string} [@@deriving rpc] + +let get_device_numbers path = + let rdev = (Unix.LargeFile.stat path).Unix.LargeFile.st_rdev in + let major = rdev / 256 and minor = rdev mod 256 in + (major, minor) + +let is_nbd_device path = + let nbd_device_num = 43 in + let major, _ = get_device_numbers path in + major = nbd_device_num + +let get_nbd_device path = + let nbd_device_prefix = "/dev/nbd" in + if + Astring.String.is_prefix ~affix:nbd_device_prefix path && is_nbd_device path + then + let nbd_number = + Astring.String.with_range ~first:(String.length nbd_device_prefix) path + in + let {path; exportname} = + (* persistent_nbd_info_dir is written from nbd_client_manager.py as part of VBD plug*) + let persistent_nbd_info_dir = "/var/run/nonpersistent/nbd" in + let filename = persistent_nbd_info_dir ^ "/" ^ nbd_number in + Xapi_stdext_unix.Unixext.string_of_file filename + |> Jsonrpc.of_string + |> nbd_connect_info_of_rpc + in + Some (path, exportname) + else + None + +(* Copied from vhd-tool/src/image.ml. + * Just keep the situation of xapi doesn't depend on vhd-tool OCaml module. + *) +let image_behind_nbd_device = function + | Some (path, _exportname) as image -> + (* The nbd server path exposed by tapdisk can lead us to the actual image + file below. Following the symlink gives a path like + `/run/blktap-control/nbd.`, + containing the tapdisk pid and minor number. Using this information, + we can get the file path from tap-ctl. + *) + let default _ _ = image in + let filename = Unix.realpath path |> Filename.basename in + Scanf.ksscanf filename default "nbd%d.%d" (fun pid minor -> + match Tapctl.find (Tapctl.create ()) ~pid ~minor with + | _, _, Some ("vhd", vhd) -> + Some ("vhd", vhd) + | _, _, Some ("aio", vhd) -> + Some ("raw", vhd) + | _, _, _ | (exception _) -> + None + ) + | _ -> + None + +(** [find_backend_device path] returns [Some path'] where [path'] is the backend path in + the driver domain corresponding to the frontend device [path] in this domain. *) +let find_backend_device path = + try + let open Ezxenstore_core.Xenstore in + (* If we're looking at a xen frontend device, see if the backend + is in the same domain. If so check if it looks like a .vhd *) + let rdev = (Unix.stat path).Unix.st_rdev in + let major = rdev / 256 and minor = rdev mod 256 in + let link = + Unix.readlink (Printf.sprintf "/sys/dev/block/%d:%d/device" major minor) + in + match List.rev (Xapi_stdext_std.Xstringext.String.split '/' link) with + | id :: "xen" :: "devices" :: _ + when Astring.String.is_prefix ~affix:"vbd-" id -> + let id = int_of_string (String.sub id 4 (String.length id - 4)) in + with_xs (fun xs -> + let self = xs.Xs.read "domid" in + let backend = + xs.Xs.read (Printf.sprintf "device/vbd/%d/backend" id) + in + let params = xs.Xs.read (Printf.sprintf "%s/params" backend) in + match Xapi_stdext_std.Xstringext.String.split '/' backend with + | "local" :: "domain" :: bedomid :: _ -> + if not (self = bedomid) then + Helpers.internal_error + "find_backend_device: Got domid %s but expected %s" bedomid + self ; + Some params + | _ -> + raise Not_found + ) + | _ -> + raise Not_found + with _ -> None + +(** [backing_file_of_device path] returns (Some backing_file) where 'backing_file' + is the leaf backing a particular device [path] (with a driver of type + [driver] or None. [path] may either be a blktap2 device *or* a blkfront + device backed by a blktap2 device. If the latter then the script must be + run in the same domain as blkback. *) +let backing_file_of_device path ~driver = + let tapdisk_of_path path = + try + match Tapctl.of_device (Tapctl.create ()) path with + | _, _, Some (typ, backing_file) when typ = driver -> + Some backing_file + | _, _, _ -> + raise Not_found + with + | Tapctl.Not_blktap -> ( + debug "Device %s is not controlled by blktap" path ; + (* Check if it is a [driver] behind a NBD device *) + get_nbd_device path |> image_behind_nbd_device |> function + | Some (typ, backing_file) when typ = driver -> + debug "%s is a %s behind NBD device %s" backing_file driver path ; + Some backing_file + | _ -> + None + ) + | Tapctl.Not_a_device -> + debug "%s is not a device" path ; + None + | _ -> + debug "Device %s has an unknown driver" path ; + None + in + find_backend_device path |> Option.value ~default:path |> tapdisk_of_path From ee1070f6630b61664aab348f11e6d09812602001 Mon Sep 17 00:00:00 2001 From: Andrii Sultanov Date: Wed, 26 Nov 2025 09:54:59 +0000 Subject: [PATCH 4/9] xapi_vdi_helpers: Split backing_file_of_device in two Qcow_tool_wrapper and Vhd_tool_wrapper expect a particular driver to be backing the VDI and fall back to handling the VDI as raw otherwise - they will be using backing_file_of_device_with_driver. Stream_vdi, however, will need to branch on the type of the driver, and it will use backing_info_of_device (which also returns the type of the driver) Signed-off-by: Andrii Sultanov --- ocaml/xapi/qcow_tool_wrapper.ml | 2 +- ocaml/xapi/vhd_tool_wrapper.ml | 4 ++- ocaml/xapi/xapi_vdi_helpers.ml | 45 ++++++++++++++++++--------------- 3 files changed, 29 insertions(+), 22 deletions(-) diff --git a/ocaml/xapi/qcow_tool_wrapper.ml b/ocaml/xapi/qcow_tool_wrapper.ml index 3957bbe4864..6f7ae7fafb9 100644 --- a/ocaml/xapi/qcow_tool_wrapper.ml +++ b/ocaml/xapi/qcow_tool_wrapper.ml @@ -79,7 +79,7 @@ let parse_header qcow_path = let send ?relative_to (progress_cb : int -> unit) (unix_fd : Unix.file_descr) (path : string) (_size : Int64.t) = let qcow_of_device = - Xapi_vdi_helpers.backing_file_of_device ~driver:"qcow2" + Xapi_vdi_helpers.backing_file_of_device_with_driver ~driver:"qcow2" in let qcow_path = qcow_of_device path in diff --git a/ocaml/xapi/vhd_tool_wrapper.ml b/ocaml/xapi/vhd_tool_wrapper.ml index c3460381cb1..2a3d40ae5ce 100644 --- a/ocaml/xapi/vhd_tool_wrapper.ml +++ b/ocaml/xapi/vhd_tool_wrapper.ml @@ -114,7 +114,9 @@ let receive progress_cb format protocol (s : Unix.file_descr) let send progress_cb ?relative_to (protocol : string) (dest_format : string) (s : Unix.file_descr) (path : string) (size : Int64.t) (prefix : string) = - let vhd_of_device = Xapi_vdi_helpers.backing_file_of_device ~driver:"vhd" in + let vhd_of_device = + Xapi_vdi_helpers.backing_file_of_device_with_driver ~driver:"vhd" + in let s' = Uuidx.(to_string (make ())) in let source_format, source = match diff --git a/ocaml/xapi/xapi_vdi_helpers.ml b/ocaml/xapi/xapi_vdi_helpers.ml index a6d6c8905b8..f28d129eceb 100644 --- a/ocaml/xapi/xapi_vdi_helpers.ml +++ b/ocaml/xapi/xapi_vdi_helpers.ml @@ -394,35 +394,40 @@ let find_backend_device path = raise Not_found with _ -> None -(** [backing_file_of_device path] returns (Some backing_file) where 'backing_file' - is the leaf backing a particular device [path] (with a driver of type - [driver] or None. [path] may either be a blktap2 device *or* a blkfront - device backed by a blktap2 device. If the latter then the script must be - run in the same domain as blkback. *) -let backing_file_of_device path ~driver = +(** [backing_info_of_device] returns Some (driver_type, backing_file) for the + leaf backing a particular device [path]. *) +let backing_info_of_device path = let tapdisk_of_path path = try - match Tapctl.of_device (Tapctl.create ()) path with - | _, _, Some (typ, backing_file) when typ = driver -> - Some backing_file - | _, _, _ -> - raise Not_found + let _, _, backing_info = Tapctl.of_device (Tapctl.create ()) path in + backing_info with + | Tapctl.Not_a_device -> + debug "%s is not a device" path ; + None | Tapctl.Not_blktap -> ( debug "Device %s is not controlled by blktap" path ; (* Check if it is a [driver] behind a NBD device *) get_nbd_device path |> image_behind_nbd_device |> function - | Some (typ, backing_file) when typ = driver -> - debug "%s is a %s behind NBD device %s" backing_file driver path ; - Some backing_file + | Some (typ, backing_file) as backing_info -> + debug "%s is a %s behind NBD device %s" backing_file typ path ; + backing_info | _ -> None ) - | Tapctl.Not_a_device -> - debug "%s is not a device" path ; - None - | _ -> - debug "Device %s has an unknown driver" path ; - None in find_backend_device path |> Option.value ~default:path |> tapdisk_of_path + +(** [backing_file_of_device_with_driver path driver] returns Some backing_file + where [backing_file] is the leaf backing a particular device [path] + (with a driver of type [driver]) or None. + [path] may either be a blktap2 device *or* a blkfront device backed by a + blktap2 device. If the latter then the script must be + run in the same domain as blkback. *) +let backing_file_of_device_with_driver path ~driver = + match backing_info_of_device path with + | Some (typ, backing_file) when typ = driver -> + Some backing_file + | _ -> + debug "Device %s has an unknown driver" path ; + None From 77059776ac20c281fe1446f86047aa53aedbde14 Mon Sep 17 00:00:00 2001 From: Andrii Sultanov Date: Wed, 26 Nov 2025 10:47:27 +0000 Subject: [PATCH 5/9] vhd_tool_wrapper: Add parse_header to determine allocated blocks Split common code used by {Vhd,Qcow}_tool_wrapper into a new vhd_qcow_parsing module. Since Vhd_tool_wrapper.run_vhd_tool is hardcoded to read the progress percentage printed by vhd-tool, we have to use the more generic Vhd_qcow_parsing.run_qcow_tool to run vhd-tool. Since VHD and QCOW follow the same format of JSON, use the same parse_header function. Signed-off-by: Andrii Sultanov --- ocaml/xapi/qcow_tool_wrapper.ml | 53 +++++-------------------------- ocaml/xapi/vhd_qcow_parsing.ml | 56 +++++++++++++++++++++++++++++++++ ocaml/xapi/vhd_qcow_parsing.mli | 24 ++++++++++++++ ocaml/xapi/vhd_tool_wrapper.ml | 17 ++++++++++ 4 files changed, 105 insertions(+), 45 deletions(-) create mode 100644 ocaml/xapi/vhd_qcow_parsing.ml create mode 100644 ocaml/xapi/vhd_qcow_parsing.mli diff --git a/ocaml/xapi/qcow_tool_wrapper.ml b/ocaml/xapi/qcow_tool_wrapper.ml index 6f7ae7fafb9..5bcfda36317 100644 --- a/ocaml/xapi/qcow_tool_wrapper.ml +++ b/ocaml/xapi/qcow_tool_wrapper.ml @@ -12,36 +12,6 @@ * GNU Lesser General Public License for more details. *) -module D = Debug.Make (struct let name = __MODULE__ end) - -open D - -let run_qcow_tool qcow_tool ?(replace_fds = []) ?input_fd ?output_fd - (_progress_cb : int -> unit) (args : string list) = - info "Executing %s %s" qcow_tool (String.concat " " args) ; - let open Forkhelpers in - match - with_logfile_fd "qcow-tool" (fun log_fd -> - let pid = - safe_close_and_exec input_fd output_fd (Some log_fd) replace_fds - qcow_tool args - in - let _, status = waitpid pid in - if status <> Unix.WEXITED 0 then ( - error "qcow-tool failed, returning VDI_IO_ERROR" ; - raise - (Api_errors.Server_error - (Api_errors.vdi_io_error, ["Device I/O errors"]) - ) - ) - ) - with - | Success (out, _) -> - debug "qcow-tool successful export (%s)" out - | Failure (out, _e) -> - error "qcow-tool output: %s" out ; - raise (Api_errors.Server_error (Api_errors.vdi_io_error, [out])) - let update_task_progress (__context : Context.t) (x : int) = TaskHelper.set_progress ~__context (float_of_int x /. 100.) @@ -49,7 +19,7 @@ let receive (progress_cb : int -> unit) (unix_fd : Unix.file_descr) (path : string) = let args = ["stream_decode"; path] in let qcow_tool = !Xapi_globs.qcow_stream_tool in - run_qcow_tool qcow_tool progress_cb args ~input_fd:unix_fd + Vhd_qcow_parsing.run_tool qcow_tool progress_cb args ~input_fd:unix_fd let read_header qcow_path = let args = ["read_headers"; qcow_path] in @@ -58,23 +28,16 @@ let read_header qcow_path = let progress_cb _ = () in Xapi_stdext_pervasives.Pervasiveext.finally - (fun () -> run_qcow_tool qcow_tool progress_cb args ~output_fd:pipe_writer) + (fun () -> + Vhd_qcow_parsing.run_tool qcow_tool progress_cb args + ~output_fd:pipe_writer + ) (fun () -> Unix.close pipe_writer) ; pipe_reader let parse_header qcow_path = let pipe_reader = read_header qcow_path in - let ic = Unix.in_channel_of_descr pipe_reader in - let buf = Buffer.create 4096 in - let json = Yojson.Basic.from_channel ~buf ~fname:"qcow_header.json" ic in - In_channel.close ic ; - let cluster_size = - 1 lsl Yojson.Basic.Util.(member "cluster_bits" json |> to_int) - in - let cluster_list = - Yojson.Basic.Util.(member "data_clusters" json |> to_list |> List.map to_int) - in - (cluster_size, cluster_list) + Vhd_qcow_parsing.parse_header pipe_reader let send ?relative_to (progress_cb : int -> unit) (unix_fd : Unix.file_descr) (path : string) (_size : Int64.t) = @@ -107,8 +70,8 @@ let send ?relative_to (progress_cb : int -> unit) (unix_fd : Unix.file_descr) let replace_fds = Option.map (fun fd -> [(unique_string, fd)]) diff_fd in Xapi_stdext_pervasives.Pervasiveext.finally (fun () -> - run_qcow_tool qcow_tool progress_cb args ?input_fd ~output_fd:unix_fd - ?replace_fds + Vhd_qcow_parsing.run_tool qcow_tool progress_cb args ?input_fd + ~output_fd:unix_fd ?replace_fds ) (fun () -> Option.iter Unix.close input_fd ; diff --git a/ocaml/xapi/vhd_qcow_parsing.ml b/ocaml/xapi/vhd_qcow_parsing.ml new file mode 100644 index 00000000000..0043edeeaf2 --- /dev/null +++ b/ocaml/xapi/vhd_qcow_parsing.ml @@ -0,0 +1,56 @@ +(* + * Copyright (C) 2025 Vates. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +module D = Debug.Make (struct let name = __MODULE__ end) + +open D + +let run_tool tool ?(replace_fds = []) ?input_fd ?output_fd + (_progress_cb : int -> unit) (args : string list) = + info "Executing %s %s" tool (String.concat " " args) ; + let open Forkhelpers in + match + with_logfile_fd tool (fun log_fd -> + let pid = + safe_close_and_exec input_fd output_fd (Some log_fd) replace_fds tool + args + in + let _, status = waitpid pid in + if status <> Unix.WEXITED 0 then ( + error "qcow-tool failed, returning VDI_IO_ERROR" ; + raise + (Api_errors.Server_error + (Api_errors.vdi_io_error, ["Device I/O errors"]) + ) + ) + ) + with + | Success (out, _) -> + debug "%s successful export (%s)" tool out + | Failure (out, _e) -> + error "%s output: %s" tool out ; + raise (Api_errors.Server_error (Api_errors.vdi_io_error, [out])) + +let parse_header pipe_reader = + let ic = Unix.in_channel_of_descr pipe_reader in + let buf = Buffer.create 4096 in + let json = Yojson.Basic.from_channel ~buf ~fname:"header.json" ic in + In_channel.close ic ; + let cluster_size = + 1 lsl Yojson.Basic.Util.(member "cluster_bits" json |> to_int) + in + let cluster_list = + Yojson.Basic.Util.(member "data_clusters" json |> to_list |> List.map to_int) + in + (cluster_size, cluster_list) diff --git a/ocaml/xapi/vhd_qcow_parsing.mli b/ocaml/xapi/vhd_qcow_parsing.mli new file mode 100644 index 00000000000..25417c0b91c --- /dev/null +++ b/ocaml/xapi/vhd_qcow_parsing.mli @@ -0,0 +1,24 @@ +(* + * Copyright (C) 2025 Vates. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +val run_tool : + string + -> ?replace_fds:(string * Unix.file_descr) list + -> ?input_fd:Unix.file_descr + -> ?output_fd:Unix.file_descr + -> (int -> unit) + -> string list + -> unit + +val parse_header : Unix.file_descr -> int * int list diff --git a/ocaml/xapi/vhd_tool_wrapper.ml b/ocaml/xapi/vhd_tool_wrapper.ml index 2a3d40ae5ce..168b3fa39a5 100644 --- a/ocaml/xapi/vhd_tool_wrapper.ml +++ b/ocaml/xapi/vhd_tool_wrapper.ml @@ -112,6 +112,23 @@ let receive progress_cb format protocol (s : Unix.file_descr) in run_vhd_tool progress_cb args s s' path +let read_vhd_header path = + let vhd_tool = !Xapi_globs.vhd_tool in + let args = ["read_headers"; path] in + let pipe_reader, pipe_writer = Unix.pipe ~cloexec:true () in + + let progress_cb _ = () in + Xapi_stdext_pervasives.Pervasiveext.finally + (fun () -> + Vhd_qcow_parsing.run_tool vhd_tool progress_cb args ~output_fd:pipe_writer + ) + (fun () -> Unix.close pipe_writer) ; + pipe_reader + +let parse_header vhd_path = + let pipe_reader = read_vhd_header vhd_path in + Vhd_qcow_parsing.parse_header pipe_reader + let send progress_cb ?relative_to (protocol : string) (dest_format : string) (s : Unix.file_descr) (path : string) (size : Int64.t) (prefix : string) = let vhd_of_device = From 8c932860438dab1ddd1363e2bf3b24f973cb152d Mon Sep 17 00:00:00 2001 From: Andrii Sultanov Date: Wed, 26 Nov 2025 10:56:46 +0000 Subject: [PATCH 6/9] stream_vdi: Only process allocated clusters for VHD and QCOW Reads the bitmaps for VHD- and QCOW-backed VDIs, determines which clusters are allocated and only reads and writes these to the resulting xva. This avoids the need for the "timeout workaround", which is needed when no data has been sent for an extended period of time (so stream_vdi writes a "packet" that doesn't carry any data, just a checksum of an empty body. in case of a compressed export, however, the compressor binary buffers output and this timeout workaround does not work). This also greatly speeds up export of VMs with sparse VDIs. Signed-off-by: Andrii Sultanov --- ocaml/xapi/stream_vdi.ml | 208 +++++++++++++++++++++++++++------------ 1 file changed, 146 insertions(+), 62 deletions(-) diff --git a/ocaml/xapi/stream_vdi.ml b/ocaml/xapi/stream_vdi.ml index 3ce924a6417..7590230312b 100644 --- a/ocaml/xapi/stream_vdi.ml +++ b/ocaml/xapi/stream_vdi.ml @@ -17,6 +17,7 @@ module Zerocheck = Xapi_stdext_zerocheck.Zerocheck module Unixext = Xapi_stdext_unix.Unixext +module ChunkSet = Set.Make (Int) let finally = Xapi_stdext_pervasives.Pervasiveext.finally @@ -194,95 +195,174 @@ let get_chunk_numbers_in_increasing_order descriptor_list offset = let chunks = process [] offset descriptor_list in List.rev chunks +let get_allocated_chunks_from_clusters cluster_size cluster_list = + let chunk_size = Int64.to_int chunk_size in + let chunks_in_cluster = (cluster_size + chunk_size - 1) / chunk_size in + let set = + List.fold_left + (fun set cluster_no -> + let cluster_offset = cluster_no * cluster_size in + let chunk_no = cluster_offset / chunk_size in + let chunks_to_add = + Seq.init chunks_in_cluster (fun i -> chunk_no + i) + in + ChunkSet.add_seq chunks_to_add set + ) + ChunkSet.empty cluster_list + in + set + let send_one ofd (__context : Context.t) rpc session_id progress refresh_session (prefix, vdi_ref, _size) = let size = Db.VDI.get_virtual_size ~__context ~self:vdi_ref in + (* Remember when we last wrote something so that we can work around firewalls which close 'idle' connections *) + let last_transmission_time = ref 0L in let reusable_buffer = Bytes.make (Int64.to_int chunk_size) '\000' in + + (* Generic function that reads a chunk of [this_chunk_size] at [offset] and, + if [write_check chunk] is true, then writes the chunk to the filename with + [this_chunk_no] suffix *) + let actually_write_chunk ~(this_chunk_no : int) ~(offset : int64) + ~(this_chunk_size : int) ~ifd ~write_check ~first_or_last ~seek = + let buffer = + if this_chunk_size = Int64.to_int chunk_size then + reusable_buffer + else + Bytes.make this_chunk_size '\000' + in + let filename = Printf.sprintf "%s/%08d" prefix this_chunk_no in + if seek then + Unix.LargeFile.lseek ifd offset Unix.SEEK_SET |> ignore ; + Unixext.really_read ifd buffer 0 this_chunk_size ; + if write_check buffer first_or_last then ( + last_transmission_time := Mtime_clock.now_ns () ; + write_block ~__context filename buffer ofd this_chunk_size + ) ; + made_progress __context progress (Int64.of_int this_chunk_size) + in + with_open_vdi __context rpc session_id vdi_ref `RO [Unix.O_RDONLY] 0o644 (fun ifd dom0_path -> match Xapi_vdi_helpers.get_nbd_device dom0_path with - | None -> - (* Remember when we last wrote something so that we can work around firewalls which close 'idle' connections *) - let last_transmission_time = ref 0. in + | None -> ( (* NB. It used to be that chunks could be larger than a native int *) (* could handle, but this is no longer the case! Ensure all chunks *) (* are strictly less than 2^30 bytes *) - let rec stream_from (chunk_no : int) (offset : int64) = + let rec write_chunk (this_chunk_no : int) (offset : int64) + ~write_check ~seek ~timeout_workaround = refresh_session () ; let remaining = Int64.sub size offset in - if remaining > 0L then ( - let this_chunk = min remaining chunk_size in - let last_chunk = this_chunk = remaining in - let this_chunk = Int64.to_int this_chunk in - let filename = Printf.sprintf "%s/%08d" prefix chunk_no in - let now = Unix.gettimeofday () in - let time_since_transmission = now -. !last_transmission_time in + if remaining > 0L then + let this_chunk_size = + min (Int64.to_int remaining) (Int64.to_int chunk_size) + in + let last_chunk = this_chunk_size = Int64.to_int remaining in + let now = Mtime_clock.now_ns () in + let time_since_transmission = + Int64.sub now !last_transmission_time + in (* We always include the first and last blocks *) - let first_or_last = chunk_no = 0 || last_chunk in - if time_since_transmission > 5. && not first_or_last then ( + let first_or_last = this_chunk_no = 0 || last_chunk in + if + time_since_transmission > 5_000_000_000L + && (not first_or_last) + && timeout_workaround + then ( last_transmission_time := now ; + let filename = Printf.sprintf "%s/%08d" prefix this_chunk_no in write_block ~__context filename Bytes.empty ofd 0 ; (* no progress has been made *) - stream_from (chunk_no + 1) offset - ) else - let buffer = - if Int64.of_int this_chunk = chunk_size then - reusable_buffer - else - Bytes.make this_chunk '\000' - in - Unixext.really_read ifd buffer 0 this_chunk ; - if - first_or_last - || not (Zerocheck.is_all_zeros (Bytes.unsafe_to_string buffer)) - then ( - last_transmission_time := now ; - write_block ~__context filename buffer ofd this_chunk - ) ; - made_progress __context progress (Int64.of_int this_chunk) ; - stream_from (chunk_no + 1) (Int64.add offset chunk_size) - ) + Some offset + ) else ( + actually_write_chunk ~this_chunk_no ~offset ~this_chunk_size + ~ifd ~write_check ~first_or_last ~seek ; + Some (Int64.add offset chunk_size) + ) + else + None in - stream_from 0 0L - | Some (path, exportname) -> - let last_transmission_time = ref 0L in - let actually_write_chunk (this_chunk_no : int) (this_chunk_size : int) - = - let buffer = - if this_chunk_size = Int64.to_int chunk_size then - reusable_buffer - else - Bytes.make this_chunk_size '\000' + + (* Read all clusters and check if they are filled with zeros *) + let rec stream_from (this_chunk_no : int) (offset : int64) + ~write_check ~seek = + let new_offset = + write_chunk this_chunk_no offset ~write_check ~seek + ~timeout_workaround:true in - let filename = Printf.sprintf "%s/%08d" prefix this_chunk_no in - Unix.LargeFile.lseek ifd - (Int64.mul (Int64.of_int this_chunk_no) chunk_size) - Unix.SEEK_SET - |> ignore ; - Unixext.really_read ifd buffer 0 this_chunk_size ; - last_transmission_time := Mtime_clock.now_ns () ; - write_block ~__context filename buffer ofd this_chunk_size ; - made_progress __context progress (Int64.of_int this_chunk_size) + Option.iter + (fun offset -> + stream_from (this_chunk_no + 1) offset ~write_check ~seek + ) + new_offset + in + let write_check buffer first_or_last = + first_or_last + || not (Zerocheck.is_all_zeros (Bytes.unsafe_to_string buffer)) + in + + let backing_info = + Xapi_vdi_helpers.backing_info_of_device dom0_path in + match backing_info with + | Some (driver, path) when driver = "vhd" || driver = "qcow2" -> ( + try + (* Read backing file headers, then only read and write + allocated clusters from the bitmap *) + let cluster_size, cluster_list = + match driver with + | "vhd" -> + Vhd_tool_wrapper.parse_header path + | "qcow2" -> + Qcow_tool_wrapper.parse_header path + | _ -> + failwith "unreachable" + in + let set = + get_allocated_chunks_from_clusters cluster_size cluster_list + in + (* First and last chunks are always written - it's a limitation + of the XVA format *) + let last_chunk = + Int64.((to_int size - to_int chunk_size + 1) / to_int chunk_size) + in + let set = set |> ChunkSet.add 0 |> ChunkSet.add last_chunk in + ChunkSet.iter + (fun this_chunk_no -> + let offset = Int64.(mul (of_int this_chunk_no) chunk_size) in + let _ = + write_chunk this_chunk_no offset + ~write_check:(fun _ _ -> true) + ~seek:true ~timeout_workaround:false + in + () + ) + set + with e -> + debug "%s: Falling back to reading the whole raw disk after %s" + __FUNCTION__ (Printexc.to_string e) ; + stream_from 0 0L ~write_check ~seek:false + ) + | _ -> + stream_from 0 0L ~write_check ~seek:false + ) + | Some (path, exportname) -> let rec stream_from_offset (offset : int64) = let remaining = Int64.sub size offset in if remaining > 0L then ( let this_chunk_size = min (Int64.to_int chunk_size) (Int64.to_int remaining) in - let this_chunk_no = Int64.div offset chunk_size in + let this_chunk_no = Int64.(to_int (div offset chunk_size)) in let now = Mtime_clock.now_ns () in let time_since_transmission = Int64.sub now !last_transmission_time in - if - offset = 0L - || remaining <= chunk_size - || time_since_transmission > 5000000000L - then ( - actually_write_chunk - (Int64.to_int this_chunk_no) - this_chunk_size ; + let first_or_last = offset = 0L || remaining <= chunk_size in + if first_or_last || time_since_transmission > 5000000000L then ( + actually_write_chunk ~this_chunk_no ~offset ~this_chunk_size + ~ifd ~first_or_last + ~write_check:(fun _ _ -> true) + ~seek:true ; stream_from_offset (Int64.add offset (Int64.of_int this_chunk_size)) ) else @@ -313,8 +393,12 @@ let send_one ofd (__context : Context.t) rpc session_id progress refresh_session in List.iter (fun chunk -> - actually_write_chunk (Int64.to_int chunk) - (Int64.to_int chunk_size) + let offset = Int64.mul chunk chunk_size in + actually_write_chunk ~this_chunk_no:(Int64.to_int chunk) + ~offset ~this_chunk_size:(Int64.to_int chunk_size) ~ifd + ~first_or_last:false + ~write_check:(fun _ _ -> true) + ~seek:true ) chunks ; stream_from_offset (Int64.add offset sparseness_size) From a7b78d96635377ea5ae25e76f35b5cd5bc23edcf Mon Sep 17 00:00:00 2001 From: Andrii Sultanov Date: Wed, 10 Dec 2025 08:43:26 +0000 Subject: [PATCH 7/9] tapctl: Return Option instead of raising Not_found Some of the users of the function did not handle exceptions correctly - make the "not found" case explicit with an option. Signed-off-by: Andrii Sultanov --- ocaml/tapctl/tapctl.ml | 4 ++-- ocaml/tapctl/tapctl.mli | 2 +- ocaml/vhd-tool/cli/sparse_dd.ml | 4 ++-- ocaml/vhd-tool/src/image.ml | 6 +++--- ocaml/xapi/storage_smapiv1_migrate.ml | 18 ++++++++---------- ocaml/xapi/xapi_vdi_helpers.ml | 3 ++- 6 files changed, 18 insertions(+), 19 deletions(-) diff --git a/ocaml/tapctl/tapctl.ml b/ocaml/tapctl/tapctl.ml index 075eea8aba2..1974b67dde9 100644 --- a/ocaml/tapctl/tapctl.ml +++ b/ocaml/tapctl/tapctl.ml @@ -535,9 +535,9 @@ let of_device ctx path = if driver_of_major major <> "tapdev" then raise Not_blktap ; match List.filter (fun (tapdev, _, _) -> tapdev.minor = minor) (list ctx) with | [t] -> - t + Some t | _ -> - raise Not_found + None let find ctx ~pid ~minor = match list ~t:{minor; tapdisk_pid= pid} ctx with diff --git a/ocaml/tapctl/tapctl.mli b/ocaml/tapctl/tapctl.mli index 9ea93173251..6304fd8a9b9 100644 --- a/ocaml/tapctl/tapctl.mli +++ b/ocaml/tapctl/tapctl.mli @@ -91,7 +91,7 @@ exception Not_blktap (** Thrown by [of_device x] when [x] is not a device *) exception Not_a_device -val of_device : context -> string -> t +val of_device : context -> string -> t option (** Given a path to a device, return the corresponding tap information *) val find : context -> pid:int -> minor:int -> t diff --git a/ocaml/vhd-tool/cli/sparse_dd.ml b/ocaml/vhd-tool/cli/sparse_dd.ml index e593a1bb049..edb1fb9dc42 100644 --- a/ocaml/vhd-tool/cli/sparse_dd.ml +++ b/ocaml/vhd-tool/cli/sparse_dd.ml @@ -244,14 +244,14 @@ let with_paused_tapdisk path f = let path = find_backend_device path |> Opt.default path in let context = Tapctl.create () in match Tapctl.of_device context path with - | tapdev, _, Some (_driver, path) -> + | Some (tapdev, _, Some (_driver, path)) -> debug "pausing tapdisk for %s" path ; Tapctl.pause context tapdev ; after f (fun () -> debug "unpausing tapdisk for %s" path ; Tapctl.unpause context tapdev path Tapctl.Vhd ) - | _, _, _ -> + | _ -> failwith (Printf.sprintf "Failed to pause tapdisk for %s" path) (* Record when the binary started for performance measuring *) diff --git a/ocaml/vhd-tool/src/image.ml b/ocaml/vhd-tool/src/image.ml index 8a4d990ec22..95250513211 100644 --- a/ocaml/vhd-tool/src/image.ml +++ b/ocaml/vhd-tool/src/image.ml @@ -66,11 +66,11 @@ let image_behind_nbd_device image = let of_device path = match Tapctl.of_device (Tapctl.create ()) path with - | _, _, Some ("vhd", vhd) -> + | Some (_, _, Some ("vhd", vhd)) -> Some (`Vhd vhd) - | _, _, Some ("aio", vhd) -> + | Some (_, _, Some ("aio", vhd)) -> Some (`Raw vhd) - | _, _, _ -> + | _ -> None | exception Tapctl.Not_blktap -> get_nbd_device path |> image_behind_nbd_device diff --git a/ocaml/xapi/storage_smapiv1_migrate.ml b/ocaml/xapi/storage_smapiv1_migrate.ml index d64ba5d0d44..5ccf597c869 100644 --- a/ocaml/xapi/storage_smapiv1_migrate.ml +++ b/ocaml/xapi/storage_smapiv1_migrate.ml @@ -106,18 +106,16 @@ let tapdisk_of_attach_info (backend : Storage_interface.backend) = match (blockdevices, nbds) with | blockdevice :: _, _ -> ( let path = blockdevice.Storage_interface.path in - try - match Tapctl.of_device (Tapctl.create ()) path with - | tapdev, _, _ -> - Some tapdev - with - | Tapctl.Not_blktap -> + match Tapctl.of_device (Tapctl.create ()) path with + | Some (tapdev, _, _) -> + Some tapdev + | exception Tapctl.Not_blktap -> D.debug "Device %s is not controlled by blktap" path ; None - | Tapctl.Not_a_device -> + | exception Tapctl.Not_a_device -> D.debug "%s is not a device" path ; None - | _ -> + | (exception _) | None -> D.debug "Device %s has an unknown driver" path ; None ) @@ -295,8 +293,8 @@ module Copy = struct perform_cleanup_actions !on_fail ; raise e - (** [copy_into_sr] does not requires a dest vdi to be provided, instead, it will - find the nearest vdi on the [dest] sr, and if there is no such vdi, it will + (** [copy_into_sr] does not requires a dest vdi to be provided, instead, it will + find the nearest vdi on the [dest] sr, and if there is no such vdi, it will create one. *) let copy_into_sr ~task ~dbg ~sr ~vdi ~vm ~url ~dest ~verify_dest = D.debug "copy sr:%s vdi:%s url:%s dest:%s verify_dest:%B" diff --git a/ocaml/xapi/xapi_vdi_helpers.ml b/ocaml/xapi/xapi_vdi_helpers.ml index f28d129eceb..7192ecb6fd6 100644 --- a/ocaml/xapi/xapi_vdi_helpers.ml +++ b/ocaml/xapi/xapi_vdi_helpers.ml @@ -399,7 +399,8 @@ let find_backend_device path = let backing_info_of_device path = let tapdisk_of_path path = try - let _, _, backing_info = Tapctl.of_device (Tapctl.create ()) path in + let ( let* ) = Option.bind in + let* _, _, backing_info = Tapctl.of_device (Tapctl.create ()) path in backing_info with | Tapctl.Not_a_device -> From 2f26d4c94a57afda7406728ab94ab3f6ef028540 Mon Sep 17 00:00:00 2001 From: Andrii Sultanov Date: Wed, 10 Dec 2025 08:50:11 +0000 Subject: [PATCH 8/9] stream_vdi: Move to using Mtime_clock.counter Signed-off-by: Andrii Sultanov --- ocaml/xapi/stream_vdi.ml | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/ocaml/xapi/stream_vdi.ml b/ocaml/xapi/stream_vdi.ml index 7590230312b..f3bc5c8fc02 100644 --- a/ocaml/xapi/stream_vdi.ml +++ b/ocaml/xapi/stream_vdi.ml @@ -216,7 +216,10 @@ let send_one ofd (__context : Context.t) rpc session_id progress refresh_session (prefix, vdi_ref, _size) = let size = Db.VDI.get_virtual_size ~__context ~self:vdi_ref in (* Remember when we last wrote something so that we can work around firewalls which close 'idle' connections *) - let last_transmission_time = ref 0L in + let time_since_transmission = ref (Mtime_clock.counter ()) in + let need_to_retransmit time_since = + Mtime.Span.(is_longer ~than:(5 * s) time_since) + in let reusable_buffer = Bytes.make (Int64.to_int chunk_size) '\000' in (* Generic function that reads a chunk of [this_chunk_size] at [offset] and, @@ -235,7 +238,7 @@ let send_one ofd (__context : Context.t) rpc session_id progress refresh_session Unix.LargeFile.lseek ifd offset Unix.SEEK_SET |> ignore ; Unixext.really_read ifd buffer 0 this_chunk_size ; if write_check buffer first_or_last then ( - last_transmission_time := Mtime_clock.now_ns () ; + time_since_transmission := Mtime_clock.counter () ; write_block ~__context filename buffer ofd this_chunk_size ) ; made_progress __context progress (Int64.of_int this_chunk_size) @@ -257,18 +260,14 @@ let send_one ofd (__context : Context.t) rpc session_id progress refresh_session min (Int64.to_int remaining) (Int64.to_int chunk_size) in let last_chunk = this_chunk_size = Int64.to_int remaining in - let now = Mtime_clock.now_ns () in - let time_since_transmission = - Int64.sub now !last_transmission_time - in (* We always include the first and last blocks *) let first_or_last = this_chunk_no = 0 || last_chunk in if - time_since_transmission > 5_000_000_000L + need_to_retransmit (Mtime_clock.count !time_since_transmission) && (not first_or_last) && timeout_workaround then ( - last_transmission_time := now ; + time_since_transmission := Mtime_clock.counter () ; let filename = Printf.sprintf "%s/%08d" prefix this_chunk_no in write_block ~__context filename Bytes.empty ofd 0 ; (* no progress has been made *) @@ -353,12 +352,12 @@ let send_one ofd (__context : Context.t) rpc session_id progress refresh_session min (Int64.to_int chunk_size) (Int64.to_int remaining) in let this_chunk_no = Int64.(to_int (div offset chunk_size)) in - let now = Mtime_clock.now_ns () in - let time_since_transmission = - Int64.sub now !last_transmission_time - in let first_or_last = offset = 0L || remaining <= chunk_size in - if first_or_last || time_since_transmission > 5000000000L then ( + if + first_or_last + || need_to_retransmit + (Mtime_clock.count !time_since_transmission) + then ( actually_write_chunk ~this_chunk_no ~offset ~this_chunk_size ~ifd ~first_or_last ~write_check:(fun _ _ -> true) From 2adb82ebd188082ad97a5b8e3e67b89bfa774e00 Mon Sep 17 00:00:00 2001 From: Andrii Sultanov Date: Wed, 10 Dec 2025 11:28:38 +0000 Subject: [PATCH 9/9] dune-project: Add yojson as vhd-tool dependency Signed-off-by: Andrii Sultanov --- dune-project | 3 ++- opam/vhd-tool.opam | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/dune-project b/dune-project index 8d329288de3..4283a97c078 100644 --- a/dune-project +++ b/dune-project @@ -536,7 +536,8 @@ (xen-api-client-lwt (= :version)) xenstore - xenstore_transport)) + xenstore_transport + yojson)) (package (name vhd-format)) diff --git a/opam/vhd-tool.opam b/opam/vhd-tool.opam index a1e46091372..0edc71774b2 100644 --- a/opam/vhd-tool.opam +++ b/opam/vhd-tool.opam @@ -40,6 +40,7 @@ depends: [ "xen-api-client-lwt" {= version} "xenstore" "xenstore_transport" + "yojson" "odoc" {with-doc} ] build: [