Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run live migration directly on the state driver task #720

Merged
453 changes: 308 additions & 145 deletions bin/propolis-server/src/lib/migrate/destination.rs

Large diffs are not rendered by default.

205 changes: 12 additions & 193 deletions bin/propolis-server/src/lib/migrate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,14 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::net::SocketAddr;

use bit_field::BitField;
use dropshot::HttpError;
use futures::{SinkExt, StreamExt};
use propolis::migrate::MigrateStateError;
use propolis_api_types::{self as api, MigrationState};
use propolis_api_types::MigrationState;
use serde::{Deserialize, Serialize};
use slog::{error, info, o};
use slog::error;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::{tungstenite, WebSocketStream};
use uuid::Uuid;

mod codec;
pub mod destination;
Expand All @@ -25,6 +18,15 @@ mod preamble;
pub mod protocol;
pub mod source;

/// Trait bounds for connection objects used in live migrations.
pub(crate) trait MigrateConn:
AsyncRead + AsyncWrite + Unpin + Send
{
}

impl MigrateConn for tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream> {}
impl MigrateConn for hyper::upgrade::Upgraded {}

#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum MigrateRole {
Source,
Expand Down Expand Up @@ -141,11 +143,6 @@ pub enum MigrateError {
/// The other end of the migration ran into an error
#[error("{0:?} migration instance encountered error: {1}")]
RemoteError(MigrateRole, String),

/// Sending/receiving from the VM state driver command/response channels
/// returned an error.
#[error("VM state driver unexpectedly closed channel")]
StateDriverChannelClosed,
}

impl From<tokio_tungstenite::tungstenite::Error> for MigrateError {
Expand Down Expand Up @@ -183,8 +180,7 @@ impl From<MigrateError> for HttpError {
| MigrateError::TimeData(_)
| MigrateError::DeviceState(_)
| MigrateError::RemoteError(_, _)
| MigrateError::StateMachine(_)
| MigrateError::StateDriverChannelClosed => {
| MigrateError::StateMachine(_) => {
HttpError::for_internal_error(msg)
}
MigrateError::MigrationAlreadyInProgress
Expand Down Expand Up @@ -219,183 +215,6 @@ struct DevicePayload {
pub data: String,
}

pub(crate) struct SourceContext<
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
> {
pub conn: WebSocketStream<T>,
pub protocol: crate::migrate::protocol::Protocol,
}

/// Begin the migration process (source-side).
///
/// This will check protocol version and then begin the migration in a separate task.
pub async fn source_start<
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
>(
log: &slog::Logger,
migration_id: Uuid,
mut conn: WebSocketStream<T>,
) -> Result<SourceContext<T>, MigrateError> {
// Create a new log context for the migration
let log = log.new(o!(
"migration_id" => migration_id.to_string(),
"migrate_role" => "source"
));
info!(log, "Migration Source");

let selected = match conn.next().await {
Some(Ok(tungstenite::Message::Text(dst_protocols))) => {
info!(log, "destination offered protocols: {}", dst_protocols);
match protocol::select_protocol_from_offer(&dst_protocols) {
Ok(Some(selected)) => {
info!(log, "selected protocol {:?}", selected);
conn.send(tungstenite::Message::Text(
selected.offer_string(),
))
.await?;
selected
}
Ok(None) => {
let src_protocols = protocol::make_protocol_offer();
error!(
log,
"no compatible destination protocols";
"dst_protocols" => &dst_protocols,
"src_protocols" => &src_protocols,
);
return Err(MigrateError::NoMatchingProtocol(
src_protocols,
dst_protocols,
));
}
Err(e) => {
error!(log, "failed to parse destination protocol offer";
"dst_protocols" => &dst_protocols,
"error" => %e);
return Err(MigrateError::ProtocolParse(
dst_protocols,
e.to_string(),
));
}
}
}
x => {
conn.send(tungstenite::Message::Close(Some(CloseFrame {
code: CloseCode::Protocol,
reason: "did not begin with version handshake.".into(),
})))
.await?;
error!(
log,
"destination side did not begin migration version handshake: \
{:?}",
x
);
return Err(MigrateError::Initiate);
}
};

Ok(SourceContext { conn, protocol: selected })
}

pub(crate) struct DestinationContext<
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
> {
pub migration_id: Uuid,
pub conn: WebSocketStream<T>,
pub local_addr: SocketAddr,
pub protocol: crate::migrate::protocol::Protocol,
}

/// Initiate a migration to the given source instance.
///
/// This will attempt to open a websocket to the given source instance and
/// check that the migrate protocol version is compatible ("equal" presently).
/// Once we've successfully established the connection, we can begin the
/// migration process (destination-side).
pub(crate) async fn dest_initiate(
log: &slog::Logger,
migrate_info: &api::InstanceMigrateInitiateRequest,
local_server_addr: SocketAddr,
) -> Result<
DestinationContext<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
MigrateError,
> {
let migration_id = migrate_info.migration_id;

// Create a new log context for the migration
let log = log.new(o!(
"migration_id" => migration_id.to_string(),
"migrate_role" => "destination",
"migrate_src_addr" => migrate_info.src_addr
));
info!(log, "Migration Destination");

// Build upgrade request to the source instance
// (we do this by hand because it's hidden from the OpenAPI spec)
// TODO(#165): https (wss)
// TODO: We need to make sure the src_addr is a valid target
let src_migrate_url = format!(
"ws://{}/instance/migrate/{}/start",
migrate_info.src_addr, migration_id,
);
info!(log, "Begin migration"; "src_migrate_url" => &src_migrate_url);
let (mut conn, _) =
tokio_tungstenite::connect_async(src_migrate_url).await?;

let dst_protocols = protocol::make_protocol_offer();
conn.send(tungstenite::Message::Text(dst_protocols)).await?;
let selected = match conn.next().await {
Some(Ok(tungstenite::Message::Text(selected_protocol))) => {
info!(log, "source negotiated protocol {}", selected_protocol);
match protocol::select_protocol_from_offer(&selected_protocol) {
Ok(Some(selected)) => selected,
Ok(None) => {
let offered = protocol::make_protocol_offer();
error!(log, "source selected protocol not on offer";
"offered" => &offered,
"selected" => &selected_protocol);

return Err(MigrateError::NoMatchingProtocol(
selected_protocol,
offered,
));
}
Err(e) => {
error!(log, "source selected protocol failed to parse";
"selected" => &selected_protocol);

return Err(MigrateError::ProtocolParse(
selected_protocol,
e.to_string(),
));
}
}
}
x => {
conn.send(tungstenite::Message::Close(Some(CloseFrame {
code: CloseCode::Protocol,
reason: "did not respond to version handshake.".into(),
})))
.await?;
error!(
log,
"source instance failed to negotiate protocol version: {:?}", x
);
return Err(MigrateError::Initiate);
}
};

Ok(DestinationContext {
migration_id,
conn,
local_addr: local_server_addr,
protocol: selected,
})
}

// We should probably turn this into some kind of ValidatedBitmap
// data structure, so that we're only parsing it once.
struct PageIter<'a> {
Expand Down
Loading