Skip to content

Commit

Permalink
Merge pull request #7173 from neondatabase/rc/proxy/2024-03-19
Browse files Browse the repository at this point in the history
Proxy release 2024-03-19
  • Loading branch information
conradludgate authored Mar 19, 2024
2 parents 27bc242 + 49be446 commit 5bad812
Show file tree
Hide file tree
Showing 98 changed files with 4,886 additions and 1,080 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ jobs:

- name: Pytest regression tests
uses: ./.github/actions/run-python-test-set
timeout-minutes: 60
with:
build_type: ${{ matrix.build_type }}
test_selection: regress
Expand Down
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ If you encounter errors during setting up the initial tenant, it's best to stop

## Running tests

### Rust unit tests

We are using [`cargo-nextest`](https://nexte.st/) to run the tests in Github Workflows.
Some crates do not support running plain `cargo test` anymore, prefer `cargo nextest run` instead.
You can install `cargo-nextest` with `cargo install cargo-nextest`.

### Integration tests

Ensure your dependencies are installed as described [here](https://github.com/neondatabase/neon#dependency-installation-notes).

```sh
Expand Down
2 changes: 2 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ disallowed-methods = [
"tokio::task::block_in_place",
# Allow this for now, to deny it later once we stop using Handle::block_on completely
# "tokio::runtime::Handle::block_on",
# use tokio_epoll_uring_ext instead
"tokio_epoll_uring::thread_local_system",
]

disallowed-macros = [
Expand Down
18 changes: 10 additions & 8 deletions compute_tools/src/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,19 +743,21 @@ pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
// which may happen in two cases:
// - extension was just installed
// - extension was already installed and is up to date
let query = "ALTER EXTENSION neon UPDATE";
info!("update neon extension version with query: {}", query);
client.simple_query(query)?;
// DISABLED due to compute node unpinning epic
// let query = "ALTER EXTENSION neon UPDATE";
// info!("update neon extension version with query: {}", query);
// client.simple_query(query)?;

Ok(())
}

#[instrument(skip_all)]
pub fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
info!("handle neon extension upgrade");
let query = "ALTER EXTENSION neon UPDATE";
info!("update neon extension version with query: {}", query);
client.simple_query(query)?;
pub fn handle_neon_extension_upgrade(_client: &mut Client) -> Result<()> {
info!("handle neon extension upgrade (not really)");
// DISABLED due to compute node unpinning epic
// let query = "ALTER EXTENSION neon UPDATE";
// info!("update neon extension version with query: {}", query);
// client.simple_query(query)?;

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions control_plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ clap.workspace = true
comfy-table.workspace = true
futures.workspace = true
git-version.workspace = true
humantime.workspace = true
nix.workspace = true
once_cell.workspace = true
postgres.workspace = true
Expand Down
227 changes: 227 additions & 0 deletions control_plane/attachment_service/src/heartbeater.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
use futures::{stream::FuturesUnordered, StreamExt};
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};
use tokio_util::sync::CancellationToken;

use pageserver_api::{
controller_api::{NodeAvailability, UtilizationScore},
models::PageserverUtilization,
};

use thiserror::Error;
use utils::id::NodeId;

use crate::node::Node;

struct HeartbeaterTask {
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
cancel: CancellationToken,

state: HashMap<NodeId, PageserverState>,

max_unavailable_interval: Duration,
jwt_token: Option<String>,
}

#[derive(Debug, Clone)]
pub(crate) enum PageserverState {
Available {
last_seen_at: Instant,
utilization: PageserverUtilization,
},
Offline,
}

#[derive(Debug)]
pub(crate) struct AvailablityDeltas(pub Vec<(NodeId, PageserverState)>);

#[derive(Debug, Error)]
pub(crate) enum HeartbeaterError {
#[error("Cancelled")]
Cancel,
}

struct HeartbeatRequest {
pageservers: Arc<HashMap<NodeId, Node>>,
reply: tokio::sync::oneshot::Sender<Result<AvailablityDeltas, HeartbeaterError>>,
}

pub(crate) struct Heartbeater {
sender: tokio::sync::mpsc::UnboundedSender<HeartbeatRequest>,
}

impl Heartbeater {
pub(crate) fn new(
jwt_token: Option<String>,
max_unavailable_interval: Duration,
cancel: CancellationToken,
) -> Self {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<HeartbeatRequest>();
let mut heartbeater =
HeartbeaterTask::new(receiver, jwt_token, max_unavailable_interval, cancel);
tokio::task::spawn(async move { heartbeater.run().await });

Self { sender }
}

pub(crate) async fn heartbeat(
&self,
pageservers: Arc<HashMap<NodeId, Node>>,
) -> Result<AvailablityDeltas, HeartbeaterError> {
let (sender, receiver) = tokio::sync::oneshot::channel();
self.sender
.send(HeartbeatRequest {
pageservers,
reply: sender,
})
.unwrap();

receiver.await.unwrap()
}
}

impl HeartbeaterTask {
fn new(
receiver: tokio::sync::mpsc::UnboundedReceiver<HeartbeatRequest>,
jwt_token: Option<String>,
max_unavailable_interval: Duration,
cancel: CancellationToken,
) -> Self {
Self {
receiver,
cancel,
state: HashMap::new(),
max_unavailable_interval,
jwt_token,
}
}

async fn run(&mut self) {
loop {
tokio::select! {
request = self.receiver.recv() => {
match request {
Some(req) => {
let res = self.heartbeat(req.pageservers).await;
req.reply.send(res).unwrap();
},
None => { return; }
}
},
_ = self.cancel.cancelled() => return
}
}
}

async fn heartbeat(
&mut self,
pageservers: Arc<HashMap<NodeId, Node>>,
) -> Result<AvailablityDeltas, HeartbeaterError> {
let mut new_state = HashMap::new();

let mut heartbeat_futs = FuturesUnordered::new();
for (node_id, node) in &*pageservers {
heartbeat_futs.push({
let jwt_token = self.jwt_token.clone();
let cancel = self.cancel.clone();

// Clone the node and mark it as available such that the request
// goes through to the pageserver even when the node is marked offline.
// This doesn't impact the availability observed by [`crate::service::Service`].
let mut node = node.clone();
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));

async move {
let response = node
.with_client_retries(
|client| async move { client.get_utilization().await },
&jwt_token,
2,
3,
Duration::from_secs(1),
&cancel,
)
.await;

let response = match response {
Some(r) => r,
None => {
// This indicates cancellation of the request.
// We ignore the node in this case.
return None;
}
};

let status = if let Ok(utilization) = response {
PageserverState::Available {
last_seen_at: Instant::now(),
utilization,
}
} else {
PageserverState::Offline
};

Some((*node_id, status))
}
});

loop {
let maybe_status = tokio::select! {
next = heartbeat_futs.next() => {
match next {
Some(result) => result,
None => { break; }
}
},
_ = self.cancel.cancelled() => { return Err(HeartbeaterError::Cancel); }
};

if let Some((node_id, status)) = maybe_status {
new_state.insert(node_id, status);
}
}
}

let mut deltas = Vec::new();
let now = Instant::now();
for (node_id, ps_state) in new_state {
use std::collections::hash_map::Entry::*;
let entry = self.state.entry(node_id);

let mut needs_update = false;
match entry {
Occupied(ref occ) => match (occ.get(), &ps_state) {
(PageserverState::Offline, PageserverState::Offline) => {}
(PageserverState::Available { last_seen_at, .. }, PageserverState::Offline) => {
if now - *last_seen_at >= self.max_unavailable_interval {
deltas.push((node_id, ps_state.clone()));
needs_update = true;
}
}
_ => {
deltas.push((node_id, ps_state.clone()));
needs_update = true;
}
},
Vacant(_) => {
deltas.push((node_id, ps_state.clone()));
}
}

match entry {
Occupied(mut occ) if needs_update => {
(*occ.get_mut()) = ps_state;
}
Vacant(vac) => {
vac.insert(ps_state);
}
_ => {}
}
}

Ok(AvailablityDeltas(deltas))
}
}
19 changes: 14 additions & 5 deletions control_plane/attachment_service/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio_util::sync::CancellationToken;
use utils::auth::{Scope, SwappableJwtAuth};
use utils::failpoint_support::failpoints_handler;
use utils::http::endpoint::{auth_middleware, check_permission_with, request_span};
use utils::http::request::{must_get_query_param, parse_request_param};
use utils::http::request::{must_get_query_param, parse_query_param, parse_request_param};
use utils::id::{TenantId, TimelineId};

use utils::{
Expand All @@ -28,7 +28,7 @@ use utils::{
};

use pageserver_api::controller_api::{
NodeConfigureRequest, NodeRegisterRequest, TenantShardMigrateRequest,
NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, TenantShardMigrateRequest,
};
use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};

Expand Down Expand Up @@ -248,8 +248,10 @@ async fn handle_tenant_secondary_download(
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
service.tenant_secondary_download(tenant_id).await?;
json_response(StatusCode::OK, ())
let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);

let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?;
json_response(status, progress)
}

async fn handle_tenant_delete(
Expand Down Expand Up @@ -389,7 +391,14 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,

json_response(
StatusCode::OK,
state.service.node_configure(config_req).await?,
state
.service
.node_configure(
config_req.node_id,
config_req.availability.map(NodeAvailability::from),
config_req.scheduling,
)
.await?,
)
}

Expand Down
1 change: 1 addition & 0 deletions control_plane/attachment_service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use utils::seqwait::MonotonicCounter;

mod auth;
mod compute_hook;
mod heartbeater;
pub mod http;
mod id_lock_map;
pub mod metrics;
Expand Down
Loading

1 comment on commit 5bad812

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2706 tests run: 2575 passed, 0 failed, 131 skipped (full report)


Code coverage* (full report)

  • functions: 28.3% (7131 of 25171 functions)
  • lines: 46.9% (43725 of 93319 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
5bad812 at 2024-03-19T12:58:02.732Z :recycle:

Please sign in to comment.