|
17 | 17 |
|
18 | 18 | module Zerocheck = Xapi_stdext_zerocheck.Zerocheck |
19 | 19 | module Unixext = Xapi_stdext_unix.Unixext |
| 20 | +module ChunkSet = Set.Make (Int) |
20 | 21 |
|
21 | 22 | let finally = Xapi_stdext_pervasives.Pervasiveext.finally |
22 | 23 |
|
@@ -194,95 +195,174 @@ let get_chunk_numbers_in_increasing_order descriptor_list offset = |
194 | 195 | let chunks = process [] offset descriptor_list in |
195 | 196 | List.rev chunks |
196 | 197 |
|
| 198 | +let get_allocated_chunks_from_clusters cluster_size cluster_list = |
| 199 | + let chunk_size = Int64.to_int chunk_size in |
| 200 | + let chunks_in_cluster = (cluster_size + chunk_size - 1) / chunk_size in |
| 201 | + let set = |
| 202 | + List.fold_left |
| 203 | + (fun set cluster_no -> |
| 204 | + let cluster_offset = cluster_no * cluster_size in |
| 205 | + let chunk_no = cluster_offset / chunk_size in |
| 206 | + let chunks_to_add = |
| 207 | + Seq.init chunks_in_cluster (fun i -> chunk_no + i) |
| 208 | + in |
| 209 | + ChunkSet.add_seq chunks_to_add set |
| 210 | + ) |
| 211 | + ChunkSet.empty cluster_list |
| 212 | + in |
| 213 | + set |
| 214 | + |
197 | 215 | let send_one ofd (__context : Context.t) rpc session_id progress refresh_session |
198 | 216 | (prefix, vdi_ref, _size) = |
199 | 217 | let size = Db.VDI.get_virtual_size ~__context ~self:vdi_ref in |
| 218 | + (* Remember when we last wrote something so that we can work around firewalls which close 'idle' connections *) |
| 219 | + let last_transmission_time = ref 0L in |
200 | 220 | let reusable_buffer = Bytes.make (Int64.to_int chunk_size) '\000' in |
| 221 | + |
| 222 | + (* Generic function that reads a chunk of [this_chunk_size] at [offset] and, |
| 223 | + if [write_check chunk] is true, then writes the chunk to the filename with |
| 224 | + [this_chunk_no] suffix *) |
| 225 | + let actually_write_chunk ~(this_chunk_no : int) ~(offset : int64) |
| 226 | + ~(this_chunk_size : int) ~ifd ~write_check ~first_or_last ~seek = |
| 227 | + let buffer = |
| 228 | + if this_chunk_size = Int64.to_int chunk_size then |
| 229 | + reusable_buffer |
| 230 | + else |
| 231 | + Bytes.make this_chunk_size '\000' |
| 232 | + in |
| 233 | + let filename = Printf.sprintf "%s/%08d" prefix this_chunk_no in |
| 234 | + if seek then |
| 235 | + Unix.LargeFile.lseek ifd offset Unix.SEEK_SET |> ignore ; |
| 236 | + Unixext.really_read ifd buffer 0 this_chunk_size ; |
| 237 | + if write_check buffer first_or_last then ( |
| 238 | + last_transmission_time := Mtime_clock.now_ns () ; |
| 239 | + write_block ~__context filename buffer ofd this_chunk_size |
| 240 | + ) ; |
| 241 | + made_progress __context progress (Int64.of_int this_chunk_size) |
| 242 | + in |
| 243 | + |
201 | 244 | with_open_vdi __context rpc session_id vdi_ref `RO [Unix.O_RDONLY] 0o644 |
202 | 245 | (fun ifd dom0_path -> |
203 | 246 | match Xapi_vdi_helpers.get_nbd_device dom0_path with |
204 | | - | None -> |
205 | | - (* Remember when we last wrote something so that we can work around firewalls which close 'idle' connections *) |
206 | | - let last_transmission_time = ref 0. in |
| 247 | + | None -> ( |
207 | 248 | (* NB. It used to be that chunks could be larger than a native int *) |
208 | 249 | (* could handle, but this is no longer the case! Ensure all chunks *) |
209 | 250 | (* are strictly less than 2^30 bytes *) |
210 | | - let rec stream_from (chunk_no : int) (offset : int64) = |
| 251 | + let rec write_chunk (this_chunk_no : int) (offset : int64) |
| 252 | + ~write_check ~seek ~timeout_workaround = |
211 | 253 | refresh_session () ; |
212 | 254 | let remaining = Int64.sub size offset in |
213 | | - if remaining > 0L then ( |
214 | | - let this_chunk = min remaining chunk_size in |
215 | | - let last_chunk = this_chunk = remaining in |
216 | | - let this_chunk = Int64.to_int this_chunk in |
217 | | - let filename = Printf.sprintf "%s/%08d" prefix chunk_no in |
218 | | - let now = Unix.gettimeofday () in |
219 | | - let time_since_transmission = now -. !last_transmission_time in |
| 255 | + if remaining > 0L then |
| 256 | + let this_chunk_size = |
| 257 | + min (Int64.to_int remaining) (Int64.to_int chunk_size) |
| 258 | + in |
| 259 | + let last_chunk = this_chunk_size = Int64.to_int remaining in |
| 260 | + let now = Mtime_clock.now_ns () in |
| 261 | + let time_since_transmission = |
| 262 | + Int64.sub now !last_transmission_time |
| 263 | + in |
220 | 264 | (* We always include the first and last blocks *) |
221 | | - let first_or_last = chunk_no = 0 || last_chunk in |
222 | | - if time_since_transmission > 5. && not first_or_last then ( |
| 265 | + let first_or_last = this_chunk_no = 0 || last_chunk in |
| 266 | + if |
| 267 | + time_since_transmission > 5_000_000_000L |
| 268 | + && (not first_or_last) |
| 269 | + && timeout_workaround |
| 270 | + then ( |
223 | 271 | last_transmission_time := now ; |
| 272 | + let filename = Printf.sprintf "%s/%08d" prefix this_chunk_no in |
224 | 273 | write_block ~__context filename Bytes.empty ofd 0 ; |
225 | 274 | (* no progress has been made *) |
226 | | - stream_from (chunk_no + 1) offset |
227 | | - ) else |
228 | | - let buffer = |
229 | | - if Int64.of_int this_chunk = chunk_size then |
230 | | - reusable_buffer |
231 | | - else |
232 | | - Bytes.make this_chunk '\000' |
233 | | - in |
234 | | - Unixext.really_read ifd buffer 0 this_chunk ; |
235 | | - if |
236 | | - first_or_last |
237 | | - || not (Zerocheck.is_all_zeros (Bytes.unsafe_to_string buffer)) |
238 | | - then ( |
239 | | - last_transmission_time := now ; |
240 | | - write_block ~__context filename buffer ofd this_chunk |
241 | | - ) ; |
242 | | - made_progress __context progress (Int64.of_int this_chunk) ; |
243 | | - stream_from (chunk_no + 1) (Int64.add offset chunk_size) |
244 | | - ) |
| 275 | + Some offset |
| 276 | + ) else ( |
| 277 | + actually_write_chunk ~this_chunk_no ~offset ~this_chunk_size |
| 278 | + ~ifd ~write_check ~first_or_last ~seek ; |
| 279 | + Some (Int64.add offset chunk_size) |
| 280 | + ) |
| 281 | + else |
| 282 | + None |
245 | 283 | in |
246 | | - stream_from 0 0L |
247 | | - | Some (path, exportname) -> |
248 | | - let last_transmission_time = ref 0L in |
249 | | - let actually_write_chunk (this_chunk_no : int) (this_chunk_size : int) |
250 | | - = |
251 | | - let buffer = |
252 | | - if this_chunk_size = Int64.to_int chunk_size then |
253 | | - reusable_buffer |
254 | | - else |
255 | | - Bytes.make this_chunk_size '\000' |
| 284 | + |
| 285 | + (* Read all clusters and check if they are filled with zeros *) |
| 286 | + let rec stream_from (this_chunk_no : int) (offset : int64) |
| 287 | + ~write_check ~seek = |
| 288 | + let new_offset = |
| 289 | + write_chunk this_chunk_no offset ~write_check ~seek |
| 290 | + ~timeout_workaround:true |
256 | 291 | in |
257 | | - let filename = Printf.sprintf "%s/%08d" prefix this_chunk_no in |
258 | | - Unix.LargeFile.lseek ifd |
259 | | - (Int64.mul (Int64.of_int this_chunk_no) chunk_size) |
260 | | - Unix.SEEK_SET |
261 | | - |> ignore ; |
262 | | - Unixext.really_read ifd buffer 0 this_chunk_size ; |
263 | | - last_transmission_time := Mtime_clock.now_ns () ; |
264 | | - write_block ~__context filename buffer ofd this_chunk_size ; |
265 | | - made_progress __context progress (Int64.of_int this_chunk_size) |
| 292 | + Option.iter |
| 293 | + (fun offset -> |
| 294 | + stream_from (this_chunk_no + 1) offset ~write_check ~seek |
| 295 | + ) |
| 296 | + new_offset |
| 297 | + in |
| 298 | + let write_check buffer first_or_last = |
| 299 | + first_or_last |
| 300 | + || not (Zerocheck.is_all_zeros (Bytes.unsafe_to_string buffer)) |
| 301 | + in |
| 302 | + |
| 303 | + let backing_info = |
| 304 | + Xapi_vdi_helpers.backing_info_of_device dom0_path |
266 | 305 | in |
| 306 | + match backing_info with |
| 307 | + | Some (driver, path) when driver = "vhd" || driver = "qcow2" -> ( |
| 308 | + try |
| 309 | + (* Read backing file headers, then only read and write |
| 310 | + allocated clusters from the bitmap *) |
| 311 | + let cluster_size, cluster_list = |
| 312 | + match driver with |
| 313 | + | "vhd" -> |
| 314 | + Vhd_tool_wrapper.parse_header path |
| 315 | + | "qcow2" -> |
| 316 | + Qcow_tool_wrapper.parse_qcow_header path |
| 317 | + | _ -> |
| 318 | + failwith "unreachable" |
| 319 | + in |
| 320 | + let set = |
| 321 | + get_allocated_chunks_from_clusters cluster_size cluster_list |
| 322 | + in |
| 323 | + (* First and last chunks are always written - it's a limitation |
| 324 | + of the XVA format *) |
| 325 | + let last_chunk = |
| 326 | + Int64.((to_int size - to_int chunk_size + 1) / to_int chunk_size) |
| 327 | + in |
| 328 | + let set = set |> ChunkSet.add 0 |> ChunkSet.add last_chunk in |
| 329 | + ChunkSet.iter |
| 330 | + (fun this_chunk_no -> |
| 331 | + let offset = Int64.(mul (of_int this_chunk_no) chunk_size) in |
| 332 | + let _ = |
| 333 | + write_chunk this_chunk_no offset |
| 334 | + ~write_check:(fun _ _ -> true) |
| 335 | + ~seek:true ~timeout_workaround:false |
| 336 | + in |
| 337 | + () |
| 338 | + ) |
| 339 | + set |
| 340 | + with e -> |
| 341 | + debug "%s: Falling back to reading the whole raw disk after %s" |
| 342 | + __FUNCTION__ (Printexc.to_string e) ; |
| 343 | + stream_from 0 0L ~write_check ~seek:false |
| 344 | + ) |
| 345 | + | _ -> |
| 346 | + stream_from 0 0L ~write_check ~seek:false |
| 347 | + ) |
| 348 | + | Some (path, exportname) -> |
267 | 349 | let rec stream_from_offset (offset : int64) = |
268 | 350 | let remaining = Int64.sub size offset in |
269 | 351 | if remaining > 0L then ( |
270 | 352 | let this_chunk_size = |
271 | 353 | min (Int64.to_int chunk_size) (Int64.to_int remaining) |
272 | 354 | in |
273 | | - let this_chunk_no = Int64.div offset chunk_size in |
| 355 | + let this_chunk_no = Int64.(to_int (div offset chunk_size)) in |
274 | 356 | let now = Mtime_clock.now_ns () in |
275 | 357 | let time_since_transmission = |
276 | 358 | Int64.sub now !last_transmission_time |
277 | 359 | in |
278 | | - if |
279 | | - offset = 0L |
280 | | - || remaining <= chunk_size |
281 | | - || time_since_transmission > 5000000000L |
282 | | - then ( |
283 | | - actually_write_chunk |
284 | | - (Int64.to_int this_chunk_no) |
285 | | - this_chunk_size ; |
| 360 | + let first_or_last = offset = 0L || remaining <= chunk_size in |
| 361 | + if first_or_last || time_since_transmission > 5000000000L then ( |
| 362 | + actually_write_chunk ~this_chunk_no ~offset ~this_chunk_size |
| 363 | + ~ifd ~first_or_last |
| 364 | + ~write_check:(fun _ _ -> true) |
| 365 | + ~seek:true ; |
286 | 366 | stream_from_offset |
287 | 367 | (Int64.add offset (Int64.of_int this_chunk_size)) |
288 | 368 | ) else |
@@ -313,8 +393,12 @@ let send_one ofd (__context : Context.t) rpc session_id progress refresh_session |
313 | 393 | in |
314 | 394 | List.iter |
315 | 395 | (fun chunk -> |
316 | | - actually_write_chunk (Int64.to_int chunk) |
317 | | - (Int64.to_int chunk_size) |
| 396 | + let offset = Int64.mul chunk chunk_size in |
| 397 | + actually_write_chunk ~this_chunk_no:(Int64.to_int chunk) |
| 398 | + ~offset ~this_chunk_size:(Int64.to_int chunk_size) ~ifd |
| 399 | + ~first_or_last:false |
| 400 | + ~write_check:(fun _ _ -> true) |
| 401 | + ~seek:true |
318 | 402 | ) |
319 | 403 | chunks ; |
320 | 404 | stream_from_offset (Int64.add offset sparseness_size) |
|
0 commit comments