diff --git a/dune-project b/dune-project index 8d329288de3..4283a97c078 100644 --- a/dune-project +++ b/dune-project @@ -536,7 +536,8 @@ (xen-api-client-lwt (= :version)) xenstore - xenstore_transport)) + xenstore_transport + yojson)) (package (name vhd-format)) diff --git a/ocaml/libs/vhd/vhd_format/dune b/ocaml/libs/vhd/vhd_format/dune index bafac365189..eea7d1960a0 100644 --- a/ocaml/libs/vhd/vhd_format/dune +++ b/ocaml/libs/vhd/vhd_format/dune @@ -2,5 +2,6 @@ (name vhd_format) (public_name vhd-format) (flags :standard -w -32-34-37) - (libraries stdlib-shims (re_export bigarray-compat) cstruct io-page rresult unix uuidm) + (libraries stdlib-shims (re_export bigarray-compat) cstruct io-page rresult + unix uuidm yojson) (preprocess (pps ppx_cstruct))) diff --git a/ocaml/libs/vhd/vhd_format/f.ml b/ocaml/libs/vhd/vhd_format/f.ml index 79128d0035f..d12329bd760 100644 --- a/ocaml/libs/vhd/vhd_format/f.ml +++ b/ocaml/libs/vhd/vhd_format/f.ml @@ -2883,6 +2883,37 @@ functor let raw ?from (vhd : fd Vhd.t) = raw_common ?from vhd + let vhd_blocks_to_json (t : fd Vhd.t) = + let block_size_sectors_shift = + t.Vhd.header.Header.block_size_sectors_shift + in + let max_table_entries = Vhd.used_max_table_entries t in + + let include_block = include_block None t in + + let blocks = + Seq.init max_table_entries Fun.id + |> Seq.filter_map (fun i -> + if include_block i then + Some (`Int i) + else + None + ) + |> List.of_seq + in + let json = + `Assoc + [ + ( "virtual_size" + , `Int (Int64.to_int t.Vhd.footer.Footer.current_size) + ) + ; ("cluster_bits", `Int (block_size_sectors_shift + sector_shift)) + ; ("data_clusters", `List blocks) + ] + in + let json_string = Yojson.to_string json in + print_string json_string ; return () + let vhd_common ?from ?raw ?(emit_batmap = false) (t : fd Vhd.t) = let block_size_sectors_shift = t.Vhd.header.Header.block_size_sectors_shift @@ -3119,6 +3150,8 @@ functor let vhd ?from (raw : 'a) (vhd : fd Vhd.t) = Vhd_input.vhd_common ?from ~raw vhd + + let blocks_json = Vhd_input.vhd_blocks_to_json end (* Create a VHD stream from data on t, using `include_block` guide us which blocks have data *) diff --git a/ocaml/libs/vhd/vhd_format/f.mli b/ocaml/libs/vhd/vhd_format/f.mli index ae300a9c95e..40de5f9d902 100644 --- a/ocaml/libs/vhd/vhd_format/f.mli +++ b/ocaml/libs/vhd/vhd_format/f.mli @@ -474,6 +474,8 @@ module From_file : functor (F : S.FILE) -> sig copies from the virtual disk [raw]. If [from] is provided then the stream will contain only the virtual updates required to transform [from] into [t] *) + + val blocks_json : fd Vhd.t -> unit t end module Raw_input : sig diff --git a/ocaml/tapctl/tapctl.ml b/ocaml/tapctl/tapctl.ml index 075eea8aba2..1974b67dde9 100644 --- a/ocaml/tapctl/tapctl.ml +++ b/ocaml/tapctl/tapctl.ml @@ -535,9 +535,9 @@ let of_device ctx path = if driver_of_major major <> "tapdev" then raise Not_blktap ; match List.filter (fun (tapdev, _, _) -> tapdev.minor = minor) (list ctx) with | [t] -> - t + Some t | _ -> - raise Not_found + None let find ctx ~pid ~minor = match list ~t:{minor; tapdisk_pid= pid} ctx with diff --git a/ocaml/tapctl/tapctl.mli b/ocaml/tapctl/tapctl.mli index 9ea93173251..6304fd8a9b9 100644 --- a/ocaml/tapctl/tapctl.mli +++ b/ocaml/tapctl/tapctl.mli @@ -91,7 +91,7 @@ exception Not_blktap (** Thrown by [of_device x] when [x] is not a device *) exception Not_a_device -val of_device : context -> string -> t +val of_device : context -> string -> t option (** Given a path to a device, return the corresponding tap information *) val find : context -> pid:int -> minor:int -> t diff --git a/ocaml/vhd-tool/cli/main.ml b/ocaml/vhd-tool/cli/main.ml index c163763c9df..fb950be35e0 100644 --- a/ocaml/vhd-tool/cli/main.ml +++ b/ocaml/vhd-tool/cli/main.ml @@ -385,9 +385,29 @@ let stream_cmd = , Cmd.info "stream" ~sdocs:_common_options ~doc ~man ) +let read_headers_cmd = + let doc = + "Parse VHD headers and output allocated blocks information in JSON format \ + like: {'virtual_size': X, 'cluster_bits: X, 'data_clusters: [1,2,3]}" + in + let source = + let doc = Printf.sprintf "Path to the VHD file" in + Arg.(required & pos 0 (some file) None & info [] ~doc) + in + ( Term.(ret (const Impl.read_headers $ common_options_t $ source)) + , Cmd.info "read_headers" ~sdocs:_common_options ~doc + ) + let cmds = [ - info_cmd; contents_cmd; get_cmd; create_cmd; check_cmd; serve_cmd; stream_cmd + info_cmd + ; contents_cmd + ; get_cmd + ; create_cmd + ; check_cmd + ; serve_cmd + ; stream_cmd + ; read_headers_cmd ] |> List.map (fun (t, i) -> Cmd.v i t) diff --git a/ocaml/vhd-tool/cli/sparse_dd.ml b/ocaml/vhd-tool/cli/sparse_dd.ml index e593a1bb049..edb1fb9dc42 100644 --- a/ocaml/vhd-tool/cli/sparse_dd.ml +++ b/ocaml/vhd-tool/cli/sparse_dd.ml @@ -244,14 +244,14 @@ let with_paused_tapdisk path f = let path = find_backend_device path |> Opt.default path in let context = Tapctl.create () in match Tapctl.of_device context path with - | tapdev, _, Some (_driver, path) -> + | Some (tapdev, _, Some (_driver, path)) -> debug "pausing tapdisk for %s" path ; Tapctl.pause context tapdev ; after f (fun () -> debug "unpausing tapdisk for %s" path ; Tapctl.unpause context tapdev path Tapctl.Vhd ) - | _, _, _ -> + | _ -> failwith (Printf.sprintf "Failed to pause tapdisk for %s" path) (* Record when the binary started for performance measuring *) diff --git a/ocaml/vhd-tool/src/image.ml b/ocaml/vhd-tool/src/image.ml index 8a4d990ec22..95250513211 100644 --- a/ocaml/vhd-tool/src/image.ml +++ b/ocaml/vhd-tool/src/image.ml @@ -66,11 +66,11 @@ let image_behind_nbd_device image = let of_device path = match Tapctl.of_device (Tapctl.create ()) path with - | _, _, Some ("vhd", vhd) -> + | Some (_, _, Some ("vhd", vhd)) -> Some (`Vhd vhd) - | _, _, Some ("aio", vhd) -> + | Some (_, _, Some ("aio", vhd)) -> Some (`Raw vhd) - | _, _, _ -> + | _ -> None | exception Tapctl.Not_blktap -> get_nbd_device path |> image_behind_nbd_device diff --git a/ocaml/vhd-tool/src/impl.ml b/ocaml/vhd-tool/src/impl.ml index d067846f565..9dda493a0a2 100644 --- a/ocaml/vhd-tool/src/impl.ml +++ b/ocaml/vhd-tool/src/impl.ml @@ -1164,6 +1164,14 @@ let stream_t common args ?(progress = no_progress_bar) () = args.StreamCommon.tar_filename_prefix args.StreamCommon.good_ciphersuites args.StreamCommon.verify_cert +let read_headers common source = + let path = [Filename.dirname source] in + let thread = + retry common 3 (fun () -> Vhd_IO.openchain ~path source false) >>= fun t -> + Vhd_IO.close t >>= fun () -> Hybrid_input.blocks_json t + in + Lwt_main.run thread ; `Ok () + let stream common args = try Vhd_format_lwt.File.use_unbuffered := common.Common.unbuffered ; diff --git a/ocaml/vhd-tool/src/impl.mli b/ocaml/vhd-tool/src/impl.mli index 2ffa08da6ce..13fe7ba6853 100644 --- a/ocaml/vhd-tool/src/impl.mli +++ b/ocaml/vhd-tool/src/impl.mli @@ -35,6 +35,9 @@ val check : val stream : Common.t -> StreamCommon.t -> [> `Error of bool * string | `Ok of unit] +val read_headers : + Common.t -> string -> [> `Error of bool * string | `Ok of unit] + val serve : Common.t -> string diff --git a/ocaml/xapi/dune b/ocaml/xapi/dune index 8095a5c4bfc..0a9b1099c0f 100644 --- a/ocaml/xapi/dune +++ b/ocaml/xapi/dune @@ -259,6 +259,7 @@ Storage_mux Storage_smapiv1_wrapper Stream_vdi + Xapi_vdi_helpers System_domains Xapi_psr Xapi_services diff --git a/ocaml/xapi/qcow_tool_wrapper.ml b/ocaml/xapi/qcow_tool_wrapper.ml index e3cd13d469b..5bcfda36317 100644 --- a/ocaml/xapi/qcow_tool_wrapper.ml +++ b/ocaml/xapi/qcow_tool_wrapper.ml @@ -12,36 +12,6 @@ * GNU Lesser General Public License for more details. *) -module D = Debug.Make (struct let name = __MODULE__ end) - -open D - -let run_qcow_tool qcow_tool ?(replace_fds = []) ?input_fd ?output_fd - (_progress_cb : int -> unit) (args : string list) = - info "Executing %s %s" qcow_tool (String.concat " " args) ; - let open Forkhelpers in - match - with_logfile_fd "qcow-tool" (fun log_fd -> - let pid = - safe_close_and_exec input_fd output_fd (Some log_fd) replace_fds - qcow_tool args - in - let _, status = waitpid pid in - if status <> Unix.WEXITED 0 then ( - error "qcow-tool failed, returning VDI_IO_ERROR" ; - raise - (Api_errors.Server_error - (Api_errors.vdi_io_error, ["Device I/O errors"]) - ) - ) - ) - with - | Success (out, _) -> - debug "qcow-tool successful export (%s)" out - | Failure (out, _e) -> - error "qcow-tool output: %s" out ; - raise (Api_errors.Server_error (Api_errors.vdi_io_error, [out])) - let update_task_progress (__context : Context.t) (x : int) = TaskHelper.set_progress ~__context (float_of_int x /. 100.) @@ -49,7 +19,7 @@ let receive (progress_cb : int -> unit) (unix_fd : Unix.file_descr) (path : string) = let args = ["stream_decode"; path] in let qcow_tool = !Xapi_globs.qcow_stream_tool in - run_qcow_tool qcow_tool progress_cb args ~input_fd:unix_fd + Vhd_qcow_parsing.run_tool qcow_tool progress_cb args ~input_fd:unix_fd let read_header qcow_path = let args = ["read_headers"; qcow_path] in @@ -58,28 +28,21 @@ let read_header qcow_path = let progress_cb _ = () in Xapi_stdext_pervasives.Pervasiveext.finally - (fun () -> run_qcow_tool qcow_tool progress_cb args ~output_fd:pipe_writer) + (fun () -> + Vhd_qcow_parsing.run_tool qcow_tool progress_cb args + ~output_fd:pipe_writer + ) (fun () -> Unix.close pipe_writer) ; pipe_reader let parse_header qcow_path = let pipe_reader = read_header qcow_path in - let ic = Unix.in_channel_of_descr pipe_reader in - let buf = Buffer.create 4096 in - let json = Yojson.Basic.from_channel ~buf ~fname:"qcow_header.json" ic in - In_channel.close ic ; - let cluster_size = - 1 lsl Yojson.Basic.Util.(member "cluster_bits" json |> to_int) - in - let cluster_list = - Yojson.Basic.Util.(member "data_clusters" json |> to_list |> List.map to_int) - in - (cluster_size, cluster_list) + Vhd_qcow_parsing.parse_header pipe_reader let send ?relative_to (progress_cb : int -> unit) (unix_fd : Unix.file_descr) (path : string) (_size : Int64.t) = let qcow_of_device = - Vhd_tool_wrapper.backing_file_of_device ~driver:"qcow2" + Xapi_vdi_helpers.backing_file_of_device_with_driver ~driver:"qcow2" in let qcow_path = qcow_of_device path in @@ -107,8 +70,8 @@ let send ?relative_to (progress_cb : int -> unit) (unix_fd : Unix.file_descr) let replace_fds = Option.map (fun fd -> [(unique_string, fd)]) diff_fd in Xapi_stdext_pervasives.Pervasiveext.finally (fun () -> - run_qcow_tool qcow_tool progress_cb args ?input_fd ~output_fd:unix_fd - ?replace_fds + Vhd_qcow_parsing.run_tool qcow_tool progress_cb args ?input_fd + ~output_fd:unix_fd ?replace_fds ) (fun () -> Option.iter Unix.close input_fd ; diff --git a/ocaml/xapi/storage_smapiv1_migrate.ml b/ocaml/xapi/storage_smapiv1_migrate.ml index d64ba5d0d44..5ccf597c869 100644 --- a/ocaml/xapi/storage_smapiv1_migrate.ml +++ b/ocaml/xapi/storage_smapiv1_migrate.ml @@ -106,18 +106,16 @@ let tapdisk_of_attach_info (backend : Storage_interface.backend) = match (blockdevices, nbds) with | blockdevice :: _, _ -> ( let path = blockdevice.Storage_interface.path in - try - match Tapctl.of_device (Tapctl.create ()) path with - | tapdev, _, _ -> - Some tapdev - with - | Tapctl.Not_blktap -> + match Tapctl.of_device (Tapctl.create ()) path with + | Some (tapdev, _, _) -> + Some tapdev + | exception Tapctl.Not_blktap -> D.debug "Device %s is not controlled by blktap" path ; None - | Tapctl.Not_a_device -> + | exception Tapctl.Not_a_device -> D.debug "%s is not a device" path ; None - | _ -> + | (exception _) | None -> D.debug "Device %s has an unknown driver" path ; None ) @@ -295,8 +293,8 @@ module Copy = struct perform_cleanup_actions !on_fail ; raise e - (** [copy_into_sr] does not requires a dest vdi to be provided, instead, it will - find the nearest vdi on the [dest] sr, and if there is no such vdi, it will + (** [copy_into_sr] does not requires a dest vdi to be provided, instead, it will + find the nearest vdi on the [dest] sr, and if there is no such vdi, it will create one. *) let copy_into_sr ~task ~dbg ~sr ~vdi ~vm ~url ~dest ~verify_dest = D.debug "copy sr:%s vdi:%s url:%s dest:%s verify_dest:%B" diff --git a/ocaml/xapi/stream_vdi.ml b/ocaml/xapi/stream_vdi.ml index 84765aeaf7b..f3bc5c8fc02 100644 --- a/ocaml/xapi/stream_vdi.ml +++ b/ocaml/xapi/stream_vdi.ml @@ -17,6 +17,7 @@ module Zerocheck = Xapi_stdext_zerocheck.Zerocheck module Unixext = Xapi_stdext_unix.Unixext +module ChunkSet = Set.Make (Int) let finally = Xapi_stdext_pervasives.Pervasiveext.finally @@ -131,63 +132,6 @@ let write_block ~__context filename buffer ofd len = else raise e -let get_device_numbers path = - let rdev = (Unix.LargeFile.stat path).Unix.LargeFile.st_rdev in - let major = rdev / 256 and minor = rdev mod 256 in - (major, minor) - -let is_nbd_device path = - let nbd_device_num = 43 in - let major, _ = get_device_numbers path in - major = nbd_device_num - -type nbd_connect_info = {path: string; exportname: string} [@@deriving rpc] - -let get_nbd_device path = - let nbd_device_prefix = "/dev/nbd" in - if - Astring.String.is_prefix ~affix:nbd_device_prefix path && is_nbd_device path - then - let nbd_number = - Astring.String.with_range ~first:(String.length nbd_device_prefix) path - in - let {path; exportname} = - (* persistent_nbd_info_dir is written from nbd_client_manager.py as part of VBD plug*) - let persistent_nbd_info_dir = "/var/run/nonpersistent/nbd" in - let filename = persistent_nbd_info_dir ^ "/" ^ nbd_number in - Xapi_stdext_unix.Unixext.string_of_file filename - |> Jsonrpc.of_string - |> nbd_connect_info_of_rpc - in - Some (path, exportname) - else - None - -(* Copied from vhd-tool/src/image.ml. - * Just keep the situation of xapi doesn't depend on vhd-tool OCaml module. - *) -let image_behind_nbd_device = function - | Some (path, _exportname) as image -> - (* The nbd server path exposed by tapdisk can lead us to the actual image - file below. Following the symlink gives a path like - `/run/blktap-control/nbd.`, - containing the tapdisk pid and minor number. Using this information, - we can get the file path from tap-ctl. - *) - let default _ _ = image in - let filename = Unix.realpath path |> Filename.basename in - Scanf.ksscanf filename default "nbd%d.%d" (fun pid minor -> - match Tapctl.find (Tapctl.create ()) ~pid ~minor with - | _, _, Some ("vhd", vhd) -> - Some ("vhd", vhd) - | _, _, Some ("aio", vhd) -> - Some ("raw", vhd) - | _, _, _ | (exception _) -> - None - ) - | _ -> - None - type extent = {flags: int32; length: int64} [@@deriving rpc] type extent_list = extent list [@@deriving rpc] @@ -251,143 +195,227 @@ let get_chunk_numbers_in_increasing_order descriptor_list offset = let chunks = process [] offset descriptor_list in List.rev chunks +let get_allocated_chunks_from_clusters cluster_size cluster_list = + let chunk_size = Int64.to_int chunk_size in + let chunks_in_cluster = (cluster_size + chunk_size - 1) / chunk_size in + let set = + List.fold_left + (fun set cluster_no -> + let cluster_offset = cluster_no * cluster_size in + let chunk_no = cluster_offset / chunk_size in + let chunks_to_add = + Seq.init chunks_in_cluster (fun i -> chunk_no + i) + in + ChunkSet.add_seq chunks_to_add set + ) + ChunkSet.empty cluster_list + in + set + +let send_one ofd (__context : Context.t) rpc session_id progress refresh_session + (prefix, vdi_ref, _size) = + let size = Db.VDI.get_virtual_size ~__context ~self:vdi_ref in + (* Remember when we last wrote something so that we can work around firewalls which close 'idle' connections *) + let time_since_transmission = ref (Mtime_clock.counter ()) in + let need_to_retransmit time_since = + Mtime.Span.(is_longer ~than:(5 * s) time_since) + in + let reusable_buffer = Bytes.make (Int64.to_int chunk_size) '\000' in + + (* Generic function that reads a chunk of [this_chunk_size] at [offset] and, + if [write_check chunk] is true, then writes the chunk to the filename with + [this_chunk_no] suffix *) + let actually_write_chunk ~(this_chunk_no : int) ~(offset : int64) + ~(this_chunk_size : int) ~ifd ~write_check ~first_or_last ~seek = + let buffer = + if this_chunk_size = Int64.to_int chunk_size then + reusable_buffer + else + Bytes.make this_chunk_size '\000' + in + let filename = Printf.sprintf "%s/%08d" prefix this_chunk_no in + if seek then + Unix.LargeFile.lseek ifd offset Unix.SEEK_SET |> ignore ; + Unixext.really_read ifd buffer 0 this_chunk_size ; + if write_check buffer first_or_last then ( + time_since_transmission := Mtime_clock.counter () ; + write_block ~__context filename buffer ofd this_chunk_size + ) ; + made_progress __context progress (Int64.of_int this_chunk_size) + in + + with_open_vdi __context rpc session_id vdi_ref `RO [Unix.O_RDONLY] 0o644 + (fun ifd dom0_path -> + match Xapi_vdi_helpers.get_nbd_device dom0_path with + | None -> ( + (* NB. It used to be that chunks could be larger than a native int *) + (* could handle, but this is no longer the case! Ensure all chunks *) + (* are strictly less than 2^30 bytes *) + let rec write_chunk (this_chunk_no : int) (offset : int64) + ~write_check ~seek ~timeout_workaround = + refresh_session () ; + let remaining = Int64.sub size offset in + if remaining > 0L then + let this_chunk_size = + min (Int64.to_int remaining) (Int64.to_int chunk_size) + in + let last_chunk = this_chunk_size = Int64.to_int remaining in + (* We always include the first and last blocks *) + let first_or_last = this_chunk_no = 0 || last_chunk in + if + need_to_retransmit (Mtime_clock.count !time_since_transmission) + && (not first_or_last) + && timeout_workaround + then ( + time_since_transmission := Mtime_clock.counter () ; + let filename = Printf.sprintf "%s/%08d" prefix this_chunk_no in + write_block ~__context filename Bytes.empty ofd 0 ; + (* no progress has been made *) + Some offset + ) else ( + actually_write_chunk ~this_chunk_no ~offset ~this_chunk_size + ~ifd ~write_check ~first_or_last ~seek ; + Some (Int64.add offset chunk_size) + ) + else + None + in + + (* Read all clusters and check if they are filled with zeros *) + let rec stream_from (this_chunk_no : int) (offset : int64) + ~write_check ~seek = + let new_offset = + write_chunk this_chunk_no offset ~write_check ~seek + ~timeout_workaround:true + in + Option.iter + (fun offset -> + stream_from (this_chunk_no + 1) offset ~write_check ~seek + ) + new_offset + in + let write_check buffer first_or_last = + first_or_last + || not (Zerocheck.is_all_zeros (Bytes.unsafe_to_string buffer)) + in + + let backing_info = + Xapi_vdi_helpers.backing_info_of_device dom0_path + in + match backing_info with + | Some (driver, path) when driver = "vhd" || driver = "qcow2" -> ( + try + (* Read backing file headers, then only read and write + allocated clusters from the bitmap *) + let cluster_size, cluster_list = + match driver with + | "vhd" -> + Vhd_tool_wrapper.parse_header path + | "qcow2" -> + Qcow_tool_wrapper.parse_header path + | _ -> + failwith "unreachable" + in + let set = + get_allocated_chunks_from_clusters cluster_size cluster_list + in + (* First and last chunks are always written - it's a limitation + of the XVA format *) + let last_chunk = + Int64.((to_int size - to_int chunk_size + 1) / to_int chunk_size) + in + let set = set |> ChunkSet.add 0 |> ChunkSet.add last_chunk in + ChunkSet.iter + (fun this_chunk_no -> + let offset = Int64.(mul (of_int this_chunk_no) chunk_size) in + let _ = + write_chunk this_chunk_no offset + ~write_check:(fun _ _ -> true) + ~seek:true ~timeout_workaround:false + in + () + ) + set + with e -> + debug "%s: Falling back to reading the whole raw disk after %s" + __FUNCTION__ (Printexc.to_string e) ; + stream_from 0 0L ~write_check ~seek:false + ) + | _ -> + stream_from 0 0L ~write_check ~seek:false + ) + | Some (path, exportname) -> + let rec stream_from_offset (offset : int64) = + let remaining = Int64.sub size offset in + if remaining > 0L then ( + let this_chunk_size = + min (Int64.to_int chunk_size) (Int64.to_int remaining) + in + let this_chunk_no = Int64.(to_int (div offset chunk_size)) in + let first_or_last = offset = 0L || remaining <= chunk_size in + if + first_or_last + || need_to_retransmit + (Mtime_clock.count !time_since_transmission) + then ( + actually_write_chunk ~this_chunk_no ~offset ~this_chunk_size + ~ifd ~first_or_last + ~write_check:(fun _ _ -> true) + ~seek:true ; + stream_from_offset + (Int64.add offset (Int64.of_int this_chunk_size)) + ) else + let remaining = + Int64.mul + (Int64.div (Int64.sub remaining 1L) chunk_size) + chunk_size + in + (* Get sparseness for next 10GB or until the end rounded by last chunk, whichever is smaller *) + let sparseness_size = min max_sparseness_size remaining in + let output, _ = + Forkhelpers.execute_command_get_output + Xapi_globs.get_nbd_extents + [ + "--path" + ; path + ; "--exportname" + ; exportname + ; "--offset" + ; Int64.to_string offset + ; "--length" + ; Int64.to_string sparseness_size + ] + in + let extents = extent_list_of_rpc (Jsonrpc.of_string output) in + let chunks = + get_chunk_numbers_in_increasing_order extents offset + in + List.iter + (fun chunk -> + let offset = Int64.mul chunk chunk_size in + actually_write_chunk ~this_chunk_no:(Int64.to_int chunk) + ~offset ~this_chunk_size:(Int64.to_int chunk_size) ~ifd + ~first_or_last:false + ~write_check:(fun _ _ -> true) + ~seek:true + ) + chunks ; + stream_from_offset (Int64.add offset sparseness_size) + ) + in + stream_from_offset 0L + ) ; + debug "Finished streaming VDI" + (** Stream a set of VDIs split into chunks in a tar format in a defined order. Return an association list mapping tar filename -> string (containing the SHA1 checksums) *) let send_all refresh_session ofd ~__context rpc session_id (prefix_vdis : vdi list) = TaskHelper.set_cancellable ~__context ; let progress = new_progress_record __context prefix_vdis in - let send_one ofd (__context : Context.t) (prefix, vdi_ref, _size) = - let size = Db.VDI.get_virtual_size ~__context ~self:vdi_ref in - let reusable_buffer = Bytes.make (Int64.to_int chunk_size) '\000' in - with_open_vdi __context rpc session_id vdi_ref `RO [Unix.O_RDONLY] 0o644 - (fun ifd dom0_path -> - match get_nbd_device dom0_path with - | None -> - (* Remember when we last wrote something so that we can work around firewalls which close 'idle' connections *) - let last_transmission_time = ref 0. in - (* NB. It used to be that chunks could be larger than a native int *) - (* could handle, but this is no longer the case! Ensure all chunks *) - (* are strictly less than 2^30 bytes *) - let rec stream_from (chunk_no : int) (offset : int64) = - refresh_session () ; - let remaining = Int64.sub size offset in - if remaining > 0L then ( - let this_chunk = min remaining chunk_size in - let last_chunk = this_chunk = remaining in - let this_chunk = Int64.to_int this_chunk in - let filename = Printf.sprintf "%s/%08d" prefix chunk_no in - let now = Unix.gettimeofday () in - let time_since_transmission = now -. !last_transmission_time in - (* We always include the first and last blocks *) - let first_or_last = chunk_no = 0 || last_chunk in - if time_since_transmission > 5. && not first_or_last then ( - last_transmission_time := now ; - write_block ~__context filename Bytes.empty ofd 0 ; - (* no progress has been made *) - stream_from (chunk_no + 1) offset - ) else - let buffer = - if Int64.of_int this_chunk = chunk_size then - reusable_buffer - else - Bytes.make this_chunk '\000' - in - Unixext.really_read ifd buffer 0 this_chunk ; - if - first_or_last - || not - (Zerocheck.is_all_zeros (Bytes.unsafe_to_string buffer)) - then ( - last_transmission_time := now ; - write_block ~__context filename buffer ofd this_chunk - ) ; - made_progress __context progress (Int64.of_int this_chunk) ; - stream_from (chunk_no + 1) (Int64.add offset chunk_size) - ) - in - stream_from 0 0L - | Some (path, exportname) -> - let last_transmission_time = ref 0L in - let actually_write_chunk (this_chunk_no : int) - (this_chunk_size : int) = - let buffer = - if this_chunk_size = Int64.to_int chunk_size then - reusable_buffer - else - Bytes.make this_chunk_size '\000' - in - let filename = Printf.sprintf "%s/%08d" prefix this_chunk_no in - Unix.LargeFile.lseek ifd - (Int64.mul (Int64.of_int this_chunk_no) chunk_size) - Unix.SEEK_SET - |> ignore ; - Unixext.really_read ifd buffer 0 this_chunk_size ; - last_transmission_time := Mtime_clock.now_ns () ; - write_block ~__context filename buffer ofd this_chunk_size ; - made_progress __context progress (Int64.of_int this_chunk_size) - in - let rec stream_from_offset (offset : int64) = - let remaining = Int64.sub size offset in - if remaining > 0L then ( - let this_chunk_size = - min (Int64.to_int chunk_size) (Int64.to_int remaining) - in - let this_chunk_no = Int64.div offset chunk_size in - let now = Mtime_clock.now_ns () in - let time_since_transmission = - Int64.sub now !last_transmission_time - in - if - offset = 0L - || remaining <= chunk_size - || time_since_transmission > 5000000000L - then ( - actually_write_chunk - (Int64.to_int this_chunk_no) - this_chunk_size ; - stream_from_offset - (Int64.add offset (Int64.of_int this_chunk_size)) - ) else - let remaining = - Int64.mul - (Int64.div (Int64.sub remaining 1L) chunk_size) - chunk_size - in - (* Get sparseness for next 10GB or until the end rounded by last chunk, whichever is smaller *) - let sparseness_size = min max_sparseness_size remaining in - let output, _ = - Forkhelpers.execute_command_get_output - Xapi_globs.get_nbd_extents - [ - "--path" - ; path - ; "--exportname" - ; exportname - ; "--offset" - ; Int64.to_string offset - ; "--length" - ; Int64.to_string sparseness_size - ] - in - let extents = extent_list_of_rpc (Jsonrpc.of_string output) in - let chunks = - get_chunk_numbers_in_increasing_order extents offset - in - List.iter - (fun chunk -> - actually_write_chunk (Int64.to_int chunk) - (Int64.to_int chunk_size) - ) - chunks ; - stream_from_offset (Int64.add offset sparseness_size) - ) - in - stream_from_offset 0L - ) ; - debug "Finished streaming VDI" - in - for_each_vdi __context (send_one ofd __context) prefix_vdis + for_each_vdi __context + (send_one ofd __context rpc session_id progress refresh_session) + prefix_vdis exception Invalid_checksum of string diff --git a/ocaml/xapi/vhd_qcow_parsing.ml b/ocaml/xapi/vhd_qcow_parsing.ml new file mode 100644 index 00000000000..0043edeeaf2 --- /dev/null +++ b/ocaml/xapi/vhd_qcow_parsing.ml @@ -0,0 +1,56 @@ +(* + * Copyright (C) 2025 Vates. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +module D = Debug.Make (struct let name = __MODULE__ end) + +open D + +let run_tool tool ?(replace_fds = []) ?input_fd ?output_fd + (_progress_cb : int -> unit) (args : string list) = + info "Executing %s %s" tool (String.concat " " args) ; + let open Forkhelpers in + match + with_logfile_fd tool (fun log_fd -> + let pid = + safe_close_and_exec input_fd output_fd (Some log_fd) replace_fds tool + args + in + let _, status = waitpid pid in + if status <> Unix.WEXITED 0 then ( + error "qcow-tool failed, returning VDI_IO_ERROR" ; + raise + (Api_errors.Server_error + (Api_errors.vdi_io_error, ["Device I/O errors"]) + ) + ) + ) + with + | Success (out, _) -> + debug "%s successful export (%s)" tool out + | Failure (out, _e) -> + error "%s output: %s" tool out ; + raise (Api_errors.Server_error (Api_errors.vdi_io_error, [out])) + +let parse_header pipe_reader = + let ic = Unix.in_channel_of_descr pipe_reader in + let buf = Buffer.create 4096 in + let json = Yojson.Basic.from_channel ~buf ~fname:"header.json" ic in + In_channel.close ic ; + let cluster_size = + 1 lsl Yojson.Basic.Util.(member "cluster_bits" json |> to_int) + in + let cluster_list = + Yojson.Basic.Util.(member "data_clusters" json |> to_list |> List.map to_int) + in + (cluster_size, cluster_list) diff --git a/ocaml/xapi/vhd_qcow_parsing.mli b/ocaml/xapi/vhd_qcow_parsing.mli new file mode 100644 index 00000000000..25417c0b91c --- /dev/null +++ b/ocaml/xapi/vhd_qcow_parsing.mli @@ -0,0 +1,24 @@ +(* + * Copyright (C) 2025 Vates. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +val run_tool : + string + -> ?replace_fds:(string * Unix.file_descr) list + -> ?input_fd:Unix.file_descr + -> ?output_fd:Unix.file_descr + -> (int -> unit) + -> string list + -> unit + +val parse_header : Unix.file_descr -> int * int list diff --git a/ocaml/xapi/vhd_tool_wrapper.ml b/ocaml/xapi/vhd_tool_wrapper.ml index f3f791fe251..168b3fa39a5 100644 --- a/ocaml/xapi/vhd_tool_wrapper.ml +++ b/ocaml/xapi/vhd_tool_wrapper.ml @@ -18,7 +18,6 @@ module D = Debug.Make (struct let name = "vhd_tool_wrapper" end) open D -open Xapi_stdext_std.Xstringext (* .vhds on XenServer are sometimes found via /dev/mapper *) let vhd_search_path = "/dev/mapper:." @@ -113,81 +112,33 @@ let receive progress_cb format protocol (s : Unix.file_descr) in run_vhd_tool progress_cb args s s' path -(** [find_backend_device path] returns [Some path'] where [path'] is the backend path in - the driver domain corresponding to the frontend device [path] in this domain. *) -let find_backend_device path = - try - let open Ezxenstore_core.Xenstore in - (* If we're looking at a xen frontend device, see if the backend - is in the same domain. If so check if it looks like a .vhd *) - let rdev = (Unix.stat path).Unix.st_rdev in - let major = rdev / 256 and minor = rdev mod 256 in - let link = - Unix.readlink (Printf.sprintf "/sys/dev/block/%d:%d/device" major minor) - in - match List.rev (String.split '/' link) with - | id :: "xen" :: "devices" :: _ - when Astring.String.is_prefix ~affix:"vbd-" id -> - let id = int_of_string (String.sub id 4 (String.length id - 4)) in - with_xs (fun xs -> - let self = xs.Xs.read "domid" in - let backend = - xs.Xs.read (Printf.sprintf "device/vbd/%d/backend" id) - in - let params = xs.Xs.read (Printf.sprintf "%s/params" backend) in - match String.split '/' backend with - | "local" :: "domain" :: bedomid :: _ -> - if not (self = bedomid) then - Helpers.internal_error - "find_backend_device: Got domid %s but expected %s" bedomid - self ; - Some params - | _ -> - raise Not_found - ) - | _ -> - raise Not_found - with _ -> None +let read_vhd_header path = + let vhd_tool = !Xapi_globs.vhd_tool in + let args = ["read_headers"; path] in + let pipe_reader, pipe_writer = Unix.pipe ~cloexec:true () in -(** [backing_file_of_device path] returns (Some backing_file) where 'backing_file' - is the leaf backing a particular device [path] (with a driver of type - [driver] or None. [path] may either be a blktap2 device *or* a blkfront - device backed by a blktap2 device. If the latter then the script must be - run in the same domain as blkback. *) -let backing_file_of_device path ~driver = - let tapdisk_of_path path = - try - match Tapctl.of_device (Tapctl.create ()) path with - | _, _, Some (typ, backing_file) when typ = driver -> - Some backing_file - | _, _, _ -> - raise Not_found - with - | Tapctl.Not_blktap -> ( - debug "Device %s is not controlled by blktap" path ; - (* Check if it is a [driver] behind a NBD device *) - Stream_vdi.(get_nbd_device path |> image_behind_nbd_device) |> function - | Some (typ, backing_file) when typ = driver -> - debug "%s is a %s behind NBD device %s" backing_file driver path ; - Some backing_file - | _ -> - None - ) - | Tapctl.Not_a_device -> - debug "%s is not a device" path ; - None - | _ -> - debug "Device %s has an unknown driver" path ; - None - in - find_backend_device path |> Option.value ~default:path |> tapdisk_of_path + let progress_cb _ = () in + Xapi_stdext_pervasives.Pervasiveext.finally + (fun () -> + Vhd_qcow_parsing.run_tool vhd_tool progress_cb args ~output_fd:pipe_writer + ) + (fun () -> Unix.close pipe_writer) ; + pipe_reader + +let parse_header vhd_path = + let pipe_reader = read_vhd_header vhd_path in + Vhd_qcow_parsing.parse_header pipe_reader let send progress_cb ?relative_to (protocol : string) (dest_format : string) (s : Unix.file_descr) (path : string) (size : Int64.t) (prefix : string) = - let vhd_of_device = backing_file_of_device ~driver:"vhd" in + let vhd_of_device = + Xapi_vdi_helpers.backing_file_of_device_with_driver ~driver:"vhd" + in let s' = Uuidx.(to_string (make ())) in let source_format, source = - match (Stream_vdi.get_nbd_device path, vhd_of_device path, relative_to) with + match + (Xapi_vdi_helpers.get_nbd_device path, vhd_of_device path, relative_to) + with | Some (nbd_server, exportname), _, None -> ( "nbdhybrid" , Printf.sprintf "%s:%s:%s:%Ld" path nbd_server exportname size diff --git a/ocaml/xapi/xapi_vdi_helpers.ml b/ocaml/xapi/xapi_vdi_helpers.ml index 84db627c719..7192ecb6fd6 100644 --- a/ocaml/xapi/xapi_vdi_helpers.ml +++ b/ocaml/xapi/xapi_vdi_helpers.ml @@ -300,3 +300,135 @@ let read_raw ~__context ~vdi = Some (VDI_CStruct.read cstruct) ) ) + +type nbd_connect_info = {path: string; exportname: string} [@@deriving rpc] + +let get_device_numbers path = + let rdev = (Unix.LargeFile.stat path).Unix.LargeFile.st_rdev in + let major = rdev / 256 and minor = rdev mod 256 in + (major, minor) + +let is_nbd_device path = + let nbd_device_num = 43 in + let major, _ = get_device_numbers path in + major = nbd_device_num + +let get_nbd_device path = + let nbd_device_prefix = "/dev/nbd" in + if + Astring.String.is_prefix ~affix:nbd_device_prefix path && is_nbd_device path + then + let nbd_number = + Astring.String.with_range ~first:(String.length nbd_device_prefix) path + in + let {path; exportname} = + (* persistent_nbd_info_dir is written from nbd_client_manager.py as part of VBD plug*) + let persistent_nbd_info_dir = "/var/run/nonpersistent/nbd" in + let filename = persistent_nbd_info_dir ^ "/" ^ nbd_number in + Xapi_stdext_unix.Unixext.string_of_file filename + |> Jsonrpc.of_string + |> nbd_connect_info_of_rpc + in + Some (path, exportname) + else + None + +(* Copied from vhd-tool/src/image.ml. + * Just keep the situation of xapi doesn't depend on vhd-tool OCaml module. + *) +let image_behind_nbd_device = function + | Some (path, _exportname) as image -> + (* The nbd server path exposed by tapdisk can lead us to the actual image + file below. Following the symlink gives a path like + `/run/blktap-control/nbd.`, + containing the tapdisk pid and minor number. Using this information, + we can get the file path from tap-ctl. + *) + let default _ _ = image in + let filename = Unix.realpath path |> Filename.basename in + Scanf.ksscanf filename default "nbd%d.%d" (fun pid minor -> + match Tapctl.find (Tapctl.create ()) ~pid ~minor with + | _, _, Some ("vhd", vhd) -> + Some ("vhd", vhd) + | _, _, Some ("aio", vhd) -> + Some ("raw", vhd) + | _, _, _ | (exception _) -> + None + ) + | _ -> + None + +(** [find_backend_device path] returns [Some path'] where [path'] is the backend path in + the driver domain corresponding to the frontend device [path] in this domain. *) +let find_backend_device path = + try + let open Ezxenstore_core.Xenstore in + (* If we're looking at a xen frontend device, see if the backend + is in the same domain. If so check if it looks like a .vhd *) + let rdev = (Unix.stat path).Unix.st_rdev in + let major = rdev / 256 and minor = rdev mod 256 in + let link = + Unix.readlink (Printf.sprintf "/sys/dev/block/%d:%d/device" major minor) + in + match List.rev (Xapi_stdext_std.Xstringext.String.split '/' link) with + | id :: "xen" :: "devices" :: _ + when Astring.String.is_prefix ~affix:"vbd-" id -> + let id = int_of_string (String.sub id 4 (String.length id - 4)) in + with_xs (fun xs -> + let self = xs.Xs.read "domid" in + let backend = + xs.Xs.read (Printf.sprintf "device/vbd/%d/backend" id) + in + let params = xs.Xs.read (Printf.sprintf "%s/params" backend) in + match Xapi_stdext_std.Xstringext.String.split '/' backend with + | "local" :: "domain" :: bedomid :: _ -> + if not (self = bedomid) then + Helpers.internal_error + "find_backend_device: Got domid %s but expected %s" bedomid + self ; + Some params + | _ -> + raise Not_found + ) + | _ -> + raise Not_found + with _ -> None + +(** [backing_info_of_device] returns Some (driver_type, backing_file) for the + leaf backing a particular device [path]. *) +let backing_info_of_device path = + let tapdisk_of_path path = + try + let ( let* ) = Option.bind in + let* _, _, backing_info = Tapctl.of_device (Tapctl.create ()) path in + backing_info + with + | Tapctl.Not_a_device -> + debug "%s is not a device" path ; + None + | Tapctl.Not_blktap -> ( + debug "Device %s is not controlled by blktap" path ; + (* Check if it is a [driver] behind a NBD device *) + get_nbd_device path |> image_behind_nbd_device |> function + | Some (typ, backing_file) as backing_info -> + debug "%s is a %s behind NBD device %s" backing_file typ path ; + backing_info + | _ -> + None + ) + in + find_backend_device path |> Option.value ~default:path |> tapdisk_of_path + +(** [backing_file_of_device_with_driver path driver] returns Some backing_file + where [backing_file] is the leaf backing a particular device [path] + (with a driver of type [driver]) or None. + [path] may either be a blktap2 device *or* a blkfront device backed by a + blktap2 device. If the latter then the script must be + run in the same domain as blkback. *) +let backing_file_of_device_with_driver path ~driver = + match backing_info_of_device path with + | Some (typ, backing_file) when typ = driver -> + Some backing_file + | _ -> + debug "Device %s has an unknown driver" path ; + None diff --git a/opam/vhd-tool.opam b/opam/vhd-tool.opam index a1e46091372..0edc71774b2 100644 --- a/opam/vhd-tool.opam +++ b/opam/vhd-tool.opam @@ -40,6 +40,7 @@ depends: [ "xen-api-client-lwt" {= version} "xenstore" "xenstore_transport" + "yojson" "odoc" {with-doc} ] build: [