Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Turbopack] Introduce VcOperation that wraps operations #70242

Open
wants to merge 4 commits into
base: canary
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions crates/napi/src/next_api/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use next_api::{
route::{Endpoint, WrittenEndpoint},
};
use tracing::Instrument;
use turbo_tasks::{Completion, ReadRef, Vc, VcValueType};
use turbo_tasks::{Completion, ReadRef, Vc, VcOperation, VcValueType};
use turbopack_core::{
diagnostics::PlainDiagnostic,
error::PrettyPrintError,
Expand Down Expand Up @@ -86,10 +86,10 @@ impl From<Option<WrittenEndpoint>> for NapiWrittenEndpoint {
// some async functions (in this case `endpoint_write_to_disk`) can cause
// higher-ranked lifetime errors. See https://github.com/rust-lang/rust/issues/102211
// 2. the type_complexity clippy lint.
pub struct ExternalEndpoint(pub VcArc<Vc<Box<dyn Endpoint>>>);
pub struct ExternalEndpoint(pub VcArc<Box<dyn Endpoint>>);

impl Deref for ExternalEndpoint {
type Target = VcArc<Vc<Box<dyn Endpoint>>>;
type Target = VcArc<Box<dyn Endpoint>>;

fn deref(&self) -> &Self::Target {
&self.0
Expand All @@ -99,13 +99,13 @@ impl Deref for ExternalEndpoint {
// Await the source and return fatal issues if there are any, otherwise
// propagate any actual error results.
async fn strongly_consistent_catch_collectables<R: VcValueType + Send>(
source: Vc<R>,
source: VcOperation<R>,
) -> Result<(
Option<ReadRef<R>>,
Arc<Vec<ReadRef<PlainIssue>>>,
Arc<Vec<ReadRef<PlainDiagnostic>>>,
)> {
let result = source.strongly_consistent().await;
let result = source.connect().strongly_consistent().await;
let issues = get_issues(source).await?;
let diagnostics = get_diagnostics(source).await?;

Expand All @@ -127,11 +127,12 @@ struct WrittenEndpointWithIssues {

#[turbo_tasks::function]
async fn get_written_endpoint_with_issues(
endpoint: Vc<Box<dyn Endpoint>>,
endpoint: VcOperation<Box<dyn Endpoint>>,
) -> Result<Vc<WrittenEndpointWithIssues>> {
let write_to_disk = endpoint.write_to_disk();
let write_to_disk = endpoint.connect().write_to_disk(endpoint);
let write_to_disk_operation = VcOperation::new(write_to_disk);
let (written, issues, diagnostics) =
strongly_consistent_catch_collectables(write_to_disk).await?;
strongly_consistent_catch_collectables(write_to_disk_operation).await?;
Ok(WrittenEndpointWithIssues {
written,
issues,
Expand Down Expand Up @@ -227,14 +228,15 @@ impl Eq for EndpointIssuesAndDiags {}

#[turbo_tasks::function]
async fn subscribe_issues_and_diags(
endpoint: Vc<Box<dyn Endpoint>>,
endpoint: VcOperation<Box<dyn Endpoint>>,
should_include_issues: bool,
) -> Result<Vc<EndpointIssuesAndDiags>> {
let changed = endpoint.server_changed();
let changed = endpoint.connect().server_changed();
let changed_operation = VcOperation::new(changed);

if should_include_issues {
let (changed_value, issues, diagnostics) =
strongly_consistent_catch_collectables(changed).await?;
strongly_consistent_catch_collectables(changed_operation).await?;
Ok(EndpointIssuesAndDiags {
changed: changed_value,
issues,
Expand Down Expand Up @@ -264,7 +266,7 @@ pub fn endpoint_client_changed_subscribe(
func,
move || {
async move {
let changed = endpoint.client_changed();
let changed = endpoint.connect().client_changed();
// We don't capture issues and diagonistics here since we don't want to be
// notified when they change
changed.strongly_consistent().await?;
Expand Down
49 changes: 28 additions & 21 deletions crates/napi/src/next_api/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
JsFunction, Status,
};
use next_api::{
entrypoints::Entrypoints,
operation::{
EntrypointsOperation, InstrumentationOperation, MiddlewareOperation, RouteOperation,
},
project::{
DefineEnv, DraftModeOptions, Instrumentation, Middleware, PartialProjectOptions, Project,
ProjectContainer, ProjectOptions,
DefineEnv, DraftModeOptions, PartialProjectOptions, Project, ProjectContainer,
ProjectOptions,
},
route::{Endpoint, Route},
route::Endpoint,
};
use next_core::tracing_presets::{
TRACING_NEXT_OVERVIEW_TARGETS, TRACING_NEXT_TARGETS, TRACING_NEXT_TURBOPACK_TARGETS,
Expand All @@ -22,7 +24,9 @@
use tokio::{io::AsyncWriteExt, time::Instant};
use tracing::Instrument;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry};
use turbo_tasks::{Completion, RcStr, ReadRef, TransientInstance, TurboTasks, UpdateInfo, Vc};
use turbo_tasks::{
Completion, RcStr, ReadRef, TransientInstance, TurboTasks, UpdateInfo, Vc, VcOperation,
};
use turbo_tasks_fs::{DiskFileSystem, FileContent, FileSystem, FileSystemPath};
use turbo_tasks_memory::MemoryBackend;
use turbopack_core::{
Expand Down Expand Up @@ -479,17 +483,17 @@
impl NapiRoute {
fn from_route(
pathname: String,
value: Route,
value: RouteOperation,
turbo_tasks: &Arc<TurboTasks<MemoryBackend>>,
) -> Self {
let convert_endpoint = |endpoint: Vc<Box<dyn Endpoint>>| {
let convert_endpoint = |endpoint: VcOperation<Box<dyn Endpoint>>| {
Some(External::new(ExternalEndpoint(VcArc::new(
turbo_tasks.clone(),
endpoint,
))))
};
match value {
Route::Page {
RouteOperation::Page {
html_endpoint,
data_endpoint,
} => NapiRoute {
Expand All @@ -499,13 +503,13 @@
data_endpoint: convert_endpoint(data_endpoint),
..Default::default()
},
Route::PageApi { endpoint } => NapiRoute {
RouteOperation::PageApi { endpoint } => NapiRoute {
pathname,
r#type: "page-api",
endpoint: convert_endpoint(endpoint),
..Default::default()
},
Route::AppPage(pages) => NapiRoute {
RouteOperation::AppPage(pages) => NapiRoute {
pathname,
r#type: "app-page",
pages: Some(
Expand All @@ -520,7 +524,7 @@
),
..Default::default()
},
Route::AppRoute {
RouteOperation::AppRoute {
original_name,
endpoint,
} => NapiRoute {
Expand All @@ -530,7 +534,7 @@
endpoint: convert_endpoint(endpoint),
..Default::default()
},
Route::Conflict => NapiRoute {
RouteOperation::Conflict => NapiRoute {
pathname,
r#type: "conflict",
..Default::default()
Expand All @@ -546,7 +550,7 @@

impl NapiMiddleware {
fn from_middleware(
value: &Middleware,
value: &MiddlewareOperation,
turbo_tasks: &Arc<TurboTasks<MemoryBackend>>,
) -> Result<Self> {
Ok(NapiMiddleware {
Expand All @@ -566,7 +570,7 @@

impl NapiInstrumentation {
fn from_instrumentation(
value: &Instrumentation,
value: &InstrumentationOperation,
turbo_tasks: &Arc<TurboTasks<MemoryBackend>>,
) -> Result<Self> {
Ok(NapiInstrumentation {
Expand Down Expand Up @@ -594,7 +598,7 @@

#[turbo_tasks::value(serialization = "none")]
struct EntrypointsWithIssues {
entrypoints: ReadRef<Entrypoints>,
entrypoints: ReadRef<EntrypointsOperation>,
issues: Arc<Vec<ReadRef<PlainIssue>>>,
diagnostics: Arc<Vec<ReadRef<PlainDiagnostic>>>,
}
Expand All @@ -603,8 +607,9 @@
async fn get_entrypoints_with_issues(
container: Vc<ProjectContainer>,
) -> Result<Vc<EntrypointsWithIssues>> {
let entrypoints_operation = container.entrypoints();
let entrypoints = entrypoints_operation.strongly_consistent().await?;
let entrypoints = container.entrypoints_operation();
let entrypoints_operation = VcOperation::new(entrypoints);
let entrypoints = entrypoints.strongly_consistent().await?;
let issues = get_issues(entrypoints_operation).await?;
let diagnostics = get_diagnostics(entrypoints_operation).await?;
Ok(EntrypointsWithIssues {
Expand Down Expand Up @@ -700,8 +705,9 @@
identifier: RcStr,
state: Vc<VersionState>,
) -> Result<Vc<HmrUpdateWithIssues>> {
let update_operation = project.hmr_update(identifier, state);
let update = update_operation.strongly_consistent().await?;
let update = project.hmr_update(identifier, state);
let update_operation = VcOperation::new(update);
let update = update.strongly_consistent().await?;
let issues = get_issues(update_operation).await?;
let diagnostics = get_diagnostics(update_operation).await?;
Ok(HmrUpdateWithIssues {
Expand Down Expand Up @@ -812,8 +818,9 @@
async fn get_hmr_identifiers_with_issues(
container: Vc<ProjectContainer>,
) -> Result<Vc<HmrIdentifiersWithIssues>> {
let hmr_identifiers_operation = container.hmr_identifiers();
let hmr_identifiers = hmr_identifiers_operation.strongly_consistent().await?;
let hmr_identifiers = container.hmr_identifiers();
let hmr_identifiers_operation = VcOperation::new(hmr_identifiers);
let hmr_identifiers = hmr_identifiers.strongly_consistent().await?;
let issues = get_issues(hmr_identifiers_operation).await?;
let diagnostics = get_diagnostics(hmr_identifiers_operation).await?;
Ok(HmrIdentifiersWithIssues {
Expand Down Expand Up @@ -909,7 +916,7 @@
}
}

/// Subscribes to lifecycle events of the compilation.

Check warning on line 919 in crates/napi/src/next_api/project.rs

View workflow job for this annotation

GitHub Actions / rustdoc check / build

public documentation for `project_update_info_subscribe` links to private item `UpdateMessage::Start`

Check warning on line 919 in crates/napi/src/next_api/project.rs

View workflow job for this annotation

GitHub Actions / rustdoc check / build

public documentation for `project_update_info_subscribe` links to private item `UpdateMessage::End`

Check warning on line 919 in crates/napi/src/next_api/project.rs

View workflow job for this annotation

GitHub Actions / rustdoc check / build

public documentation for `project_update_info_subscribe` links to private item `UpdateMessage::End`

Check warning on line 919 in crates/napi/src/next_api/project.rs

View workflow job for this annotation

GitHub Actions / rustdoc check / build

public documentation for `project_update_info_subscribe` links to private item `UpdateMessage::Start`
/// Emits an [UpdateMessage::Start] event when any computation starts.
/// Emits an [UpdateMessage::End] event when there was no computation for the
/// specified time (`aggregation_ms`). The [UpdateMessage::End] event contains
Expand Down
36 changes: 24 additions & 12 deletions crates/napi/src/next_api/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use napi::{
JsFunction, JsObject, JsUnknown, NapiRaw, NapiValue, Status,
};
use serde::Serialize;
use turbo_tasks::{ReadRef, TaskId, TryJoinIterExt, TurboTasks, Vc};
use turbo_tasks::{ReadRef, TaskId, TryJoinIterExt, TurboTasks, Vc, VcOperation};
use turbo_tasks_fs::FileContent;
use turbo_tasks_memory::MemoryBackend;
use turbopack_core::{
Expand All @@ -20,17 +20,24 @@ use turbopack_core::{
use crate::util::log_internal_error_and_inform;

/// A helper type to hold both a Vc operation and the TurboTasks root process.
/// Without this, we'd need to pass both individually all over the place
/// Without this, we'd need to pass both individually all over the place.
/// Make sure to Vc::connect the `vc` when using it.
#[derive(Clone)]
pub struct VcArc<T> {
pub struct VcArc<T>
where
T: Send,
{
turbo_tasks: Arc<TurboTasks<MemoryBackend>>,
/// The Vc. Must be resolved, otherwise you are referencing an inactive
/// operation.
vc: T,
// TODO: this should be a VcOperation
/// The Vc.
vc: VcOperation<T>,
}

impl<T> VcArc<T> {
pub fn new(turbo_tasks: Arc<TurboTasks<MemoryBackend>>, vc: T) -> Self {
impl<T> VcArc<T>
where
T: Send,
{
pub fn new(turbo_tasks: Arc<TurboTasks<MemoryBackend>>, vc: VcOperation<T>) -> Self {
Self { turbo_tasks, vc }
}

Expand All @@ -39,8 +46,11 @@ impl<T> VcArc<T> {
}
}

impl<T> Deref for VcArc<T> {
type Target = T;
impl<T> Deref for VcArc<T>
where
T: Send,
{
type Target = VcOperation<T>;

fn deref(&self) -> &Self::Target {
&self.vc
Expand Down Expand Up @@ -78,7 +88,7 @@ pub fn root_task_dispose(
Ok(())
}

pub async fn get_issues<T: Send>(source: Vc<T>) -> Result<Arc<Vec<ReadRef<PlainIssue>>>> {
pub async fn get_issues<T: Send>(source: VcOperation<T>) -> Result<Arc<Vec<ReadRef<PlainIssue>>>> {
let issues = source.peek_issues_with_path().await?;
Ok(Arc::new(issues.get_plain_issues().await?))
}
Expand All @@ -87,7 +97,9 @@ pub async fn get_issues<T: Send>(source: Vc<T>) -> Result<Arc<Vec<ReadRef<PlainI
/// by the given source and returns it as a
/// [turbopack_core::diagnostics::PlainDiagnostic]. It does
/// not consume any Diagnostics held by the source.
pub async fn get_diagnostics<T: Send>(source: Vc<T>) -> Result<Arc<Vec<ReadRef<PlainDiagnostic>>>> {
pub async fn get_diagnostics<T: Send>(
source: VcOperation<T>,
) -> Result<Arc<Vec<ReadRef<PlainDiagnostic>>>> {
let captured_diags = source.peek_diagnostics().await?;
let mut diags = captured_diags
.diagnostics
Expand Down
17 changes: 10 additions & 7 deletions crates/next-api/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use next_core::{
};
use serde::{Deserialize, Serialize};
use tracing::Instrument;
use turbo_tasks::{trace::TraceRawVcs, Completion, RcStr, TryJoinIterExt, Value, Vc};
use turbo_tasks::{trace::TraceRawVcs, Completion, RcStr, TryJoinIterExt, Value, Vc, VcOperation};
use turbo_tasks_env::{CustomProcessEnv, ProcessEnv};
use turbo_tasks_fs::{File, FileContent, FileSystemPath};
use turbopack::{
Expand Down Expand Up @@ -796,8 +796,10 @@ impl AppEndpoint {
Ok(app_entry)
}

/// This is used to wrap output assets of an endpoint into a single operation (VcOperation)
#[turbo_tasks::function]
fn output_assets(self: Vc<Self>) -> Vc<OutputAssets> {
fn output_assets(self: Vc<Self>, endpoint: VcOperation<Box<dyn Endpoint>>) -> Vc<OutputAssets> {
let _ = endpoint.connect();
self.output().output_assets()
}

Expand Down Expand Up @@ -1317,7 +1319,10 @@ impl AppEndpoint {
#[turbo_tasks::value_impl]
impl Endpoint for AppEndpoint {
#[turbo_tasks::function]
async fn write_to_disk(self: Vc<Self>) -> Result<Vc<WrittenEndpoint>> {
async fn write_to_disk(
self: Vc<Self>,
self_op: VcOperation<Box<dyn Endpoint>>,
) -> Result<Vc<WrittenEndpoint>> {
let this = self.await?;
let page_name = this.page.to_string();
let span = match this.ty {
Expand All @@ -1342,17 +1347,15 @@ impl Endpoint for AppEndpoint {
};
async move {
let output = self.output().await?;
// Must use self.output_assets() instead of output.output_assets() to make it a
// single operation
let output_assets = self.output_assets();
let output_assets = self.output_assets(self_op);

let node_root = this.app_project.project().node_root();

let node_root_ref = &node_root.await?;

this.app_project
.project()
.emit_all_output_assets(Vc::cell(output_assets))
.emit_all_output_assets(VcOperation::new(output_assets))
.await?;

let node_root = this.app_project.project().node_root();
Expand Down
7 changes: 5 additions & 2 deletions crates/next-api/src/empty.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{bail, Result};
use turbo_tasks::{Completion, Vc};
use turbo_tasks::{Completion, Vc, VcOperation};
use turbopack_core::module::Modules;

use crate::route::{Endpoint, WrittenEndpoint};
Expand All @@ -18,7 +18,10 @@ impl EmptyEndpoint {
#[turbo_tasks::value_impl]
impl Endpoint for EmptyEndpoint {
#[turbo_tasks::function]
fn write_to_disk(self: Vc<Self>) -> Result<Vc<WrittenEndpoint>> {
fn write_to_disk(
self: Vc<Self>,
_self_op: VcOperation<Box<dyn Endpoint>>,
) -> Result<Vc<WrittenEndpoint>> {
bail!("Empty endpoint can't be written to disk")
}

Expand Down
Loading
Loading