diff --git a/compiler/rustc_codegen_ssa/src/back/symbol_export.rs b/compiler/rustc_codegen_ssa/src/back/symbol_export.rs
index b19f52182b650..cce3f0e6f2d12 100644
--- a/compiler/rustc_codegen_ssa/src/back/symbol_export.rs
+++ b/compiler/rustc_codegen_ssa/src/back/symbol_export.rs
@@ -363,6 +363,24 @@ fn exported_symbols_provider_local(
                         },
                     ));
                 }
+                MonoItem::Fn(Instance {
+                    def: InstanceDef::AsyncDropGlueCtorShim(def_id, Some(ty)),
+                    args,
+                }) => {
+                    // A little sanity-check
+                    debug_assert_eq!(
+                        args.non_erasable_generics(tcx, def_id).skip(1).next(),
+                        Some(GenericArgKind::Type(ty))
+                    );
+                    symbols.push((
+                        ExportedSymbol::AsyncDropGlueCtorShim(ty),
+                        SymbolExportInfo {
+                            level: SymbolExportLevel::Rust,
+                            kind: SymbolExportKind::Text,
+                            used: false,
+                        },
+                    ));
+                }
                 _ => {
                     // Any other symbols don't qualify for sharing
                 }
@@ -385,6 +403,7 @@ fn upstream_monomorphizations_provider(
     let mut instances: DefIdMap<UnordMap<_, _>> = Default::default();
 
     let drop_in_place_fn_def_id = tcx.lang_items().drop_in_place_fn();
+    let async_drop_in_place_fn_def_id = tcx.lang_items().async_drop_in_place_fn();
 
     for &cnum in cnums.iter() {
         for (exported_symbol, _) in tcx.exported_symbols(cnum).iter() {
@@ -399,6 +418,18 @@ fn upstream_monomorphizations_provider(
                         continue;
                     }
                 }
+                ExportedSymbol::AsyncDropGlueCtorShim(ty) => {
+                    if let Some(async_drop_in_place_fn_def_id) = async_drop_in_place_fn_def_id {
+                        (
+                            async_drop_in_place_fn_def_id,
+                            tcx.mk_args(&[tcx.lifetimes.re_erased.into(), ty.into()]),
+                        )
+                    } else {
+                        // `drop_in_place` in place does not exist, don't try
+                        // to use it.
+                        continue;
+                    }
+                }
                 ExportedSymbol::NonGeneric(..)
                 | ExportedSymbol::ThreadLocalShim(..)
                 | ExportedSymbol::NoDefId(..) => {
@@ -534,6 +565,13 @@ pub fn symbol_name_for_instance_in_crate<'tcx>(
             Instance::resolve_drop_in_place(tcx, ty),
             instantiating_crate,
         ),
+        ExportedSymbol::AsyncDropGlueCtorShim(ty) => {
+            rustc_symbol_mangling::symbol_name_for_instance_in_crate(
+                tcx,
+                Instance::resolve_async_drop_in_place(tcx, ty),
+                instantiating_crate,
+            )
+        }
         ExportedSymbol::NoDefId(symbol_name) => symbol_name.to_string(),
     }
 }
@@ -582,6 +620,9 @@ pub fn linking_symbol_name_for_instance_in_crate<'tcx>(
         // DropGlue always use the Rust calling convention and thus follow the target's default
         // symbol decoration scheme.
         ExportedSymbol::DropGlue(..) => None,
+        // AsyncDropGlueCtorShim always use the Rust calling convention and thus follow the
+        // target's default symbol decoration scheme.
+        ExportedSymbol::AsyncDropGlueCtorShim(..) => None,
         // NoDefId always follow the target's default symbol decoration scheme.
         ExportedSymbol::NoDefId(..) => None,
         // ThreadLocalShim always follow the target's default symbol decoration scheme.
diff --git a/compiler/rustc_codegen_ssa/src/mir/block.rs b/compiler/rustc_codegen_ssa/src/mir/block.rs
index 24f2c50e882fb..fad81600576ee 100644
--- a/compiler/rustc_codegen_ssa/src/mir/block.rs
+++ b/compiler/rustc_codegen_ssa/src/mir/block.rs
@@ -835,7 +835,10 @@ impl<'a, 'tcx, Bx: BuilderMethods<'a, 'tcx>> FunctionCx<'a, 'tcx, Bx> {
 
         let def = instance.map(|i| i.def);
 
-        if let Some(ty::InstanceDef::DropGlue(_, None)) = def {
+        if let Some(
+            ty::InstanceDef::DropGlue(_, None) | ty::InstanceDef::AsyncDropGlueCtorShim(_, None),
+        ) = def
+        {
             // Empty drop glue; a no-op.
             let target = target.unwrap();
             return helper.funclet_br(self, bx, target, mergeable_succ);
diff --git a/compiler/rustc_const_eval/src/interpret/terminator.rs b/compiler/rustc_const_eval/src/interpret/terminator.rs
index c0e27e86d500a..908c4da71f166 100644
--- a/compiler/rustc_const_eval/src/interpret/terminator.rs
+++ b/compiler/rustc_const_eval/src/interpret/terminator.rs
@@ -558,6 +558,7 @@ impl<'mir, 'tcx: 'mir, M: Machine<'mir, 'tcx>> InterpCx<'mir, 'tcx, M> {
             | ty::InstanceDef::CloneShim(..)
             | ty::InstanceDef::FnPtrAddrShim(..)
             | ty::InstanceDef::ThreadLocalShim(..)
+            | ty::InstanceDef::AsyncDropGlueCtorShim(..)
             | ty::InstanceDef::Item(_) => {
                 // We need MIR for this fn
                 let Some((body, instance)) = M::find_mir_or_eval_fn(
diff --git a/compiler/rustc_hir/src/lang_items.rs b/compiler/rustc_hir/src/lang_items.rs
index 2a796ca5465c9..879476f890e8c 100644
--- a/compiler/rustc_hir/src/lang_items.rs
+++ b/compiler/rustc_hir/src/lang_items.rs
@@ -162,6 +162,18 @@ language_item_table! {
     Drop,                    sym::drop,                drop_trait,                 Target::Trait,          GenericRequirement::None;
     Destruct,                sym::destruct,            destruct_trait,             Target::Trait,          GenericRequirement::None;
 
+    AsyncDrop,               sym::async_drop,          async_drop_trait,           Target::Trait,          GenericRequirement::Exact(0);
+    AsyncDestruct,           sym::async_destruct,      async_destruct_trait,       Target::Trait,          GenericRequirement::Exact(0);
+    AsyncDropInPlace,        sym::async_drop_in_place, async_drop_in_place_fn,     Target::Fn,             GenericRequirement::Exact(1);
+    SurfaceAsyncDropInPlace, sym::surface_async_drop_in_place, surface_async_drop_in_place_fn, Target::Fn, GenericRequirement::Exact(1);
+    AsyncDropSurfaceDropInPlace, sym::async_drop_surface_drop_in_place, async_drop_surface_drop_in_place_fn, Target::Fn, GenericRequirement::Exact(1);
+    AsyncDropSlice,          sym::async_drop_slice,    async_drop_slice_fn,        Target::Fn,             GenericRequirement::Exact(1);
+    AsyncDropChain,          sym::async_drop_chain,    async_drop_chain_fn,        Target::Fn,             GenericRequirement::Exact(2);
+    AsyncDropNoop,           sym::async_drop_noop,     async_drop_noop_fn,         Target::Fn,             GenericRequirement::Exact(0);
+    AsyncDropFuse,           sym::async_drop_fuse,     async_drop_fuse_fn,         Target::Fn,             GenericRequirement::Exact(1);
+    AsyncDropDefer,          sym::async_drop_defer,    async_drop_defer_fn,        Target::Fn,             GenericRequirement::Exact(1);
+    AsyncDropEither,         sym::async_drop_either,   async_drop_either_fn,       Target::Fn,             GenericRequirement::Exact(3);
+
     CoerceUnsized,           sym::coerce_unsized,      coerce_unsized_trait,       Target::Trait,          GenericRequirement::Minimum(1);
     DispatchFromDyn,         sym::dispatch_from_dyn,   dispatch_from_dyn_trait,    Target::Trait,          GenericRequirement::Minimum(1);
 
@@ -281,6 +293,7 @@ language_item_table! {
 
     ExchangeMalloc,          sym::exchange_malloc,     exchange_malloc_fn,         Target::Fn,             GenericRequirement::None;
     DropInPlace,             sym::drop_in_place,       drop_in_place_fn,           Target::Fn,             GenericRequirement::Minimum(1);
+    FallbackSurfaceDrop,     sym::fallback_surface_drop, fallback_surface_drop_fn, Target::Fn,             GenericRequirement::None;
     AllocLayout,             sym::alloc_layout,        alloc_layout,               Target::Struct,         GenericRequirement::None;
 
     Start,                   sym::start,               start_fn,                   Target::Fn,             GenericRequirement::Exact(1);
diff --git a/compiler/rustc_hir_typeck/src/callee.rs b/compiler/rustc_hir_typeck/src/callee.rs
index a64d3b9633f26..dfd0b7c2945c4 100644
--- a/compiler/rustc_hir_typeck/src/callee.rs
+++ b/compiler/rustc_hir_typeck/src/callee.rs
@@ -38,8 +38,11 @@ pub fn check_legal_trait_for_method_call(
     receiver: Option<Span>,
     expr_span: Span,
     trait_id: DefId,
+    body_id: DefId,
 ) -> Result<(), ErrorGuaranteed> {
-    if tcx.lang_items().drop_trait() == Some(trait_id) {
+    if tcx.lang_items().drop_trait() == Some(trait_id)
+        && tcx.lang_items().fallback_surface_drop_fn() != Some(body_id)
+    {
         let sugg = if let Some(receiver) = receiver.filter(|s| !s.is_empty()) {
             errors::ExplicitDestructorCallSugg::Snippet {
                 lo: expr_span.shrink_to_lo(),
diff --git a/compiler/rustc_hir_typeck/src/fn_ctxt/_impl.rs b/compiler/rustc_hir_typeck/src/fn_ctxt/_impl.rs
index 2d5ba447e4e68..02bd0bf18ce47 100644
--- a/compiler/rustc_hir_typeck/src/fn_ctxt/_impl.rs
+++ b/compiler/rustc_hir_typeck/src/fn_ctxt/_impl.rs
@@ -1118,6 +1118,7 @@ impl<'a, 'tcx> FnCtxt<'a, 'tcx> {
                             None,
                             span,
                             container_id,
+                            self.body_id.to_def_id(),
                         ) {
                             self.set_tainted_by_errors(e);
                         }
diff --git a/compiler/rustc_hir_typeck/src/method/confirm.rs b/compiler/rustc_hir_typeck/src/method/confirm.rs
index 36860e446fc2b..ce1f3b267190c 100644
--- a/compiler/rustc_hir_typeck/src/method/confirm.rs
+++ b/compiler/rustc_hir_typeck/src/method/confirm.rs
@@ -628,6 +628,7 @@ impl<'a, 'tcx> ConfirmContext<'a, 'tcx> {
                 Some(self.self_expr.span),
                 self.call_expr.span,
                 trait_def_id,
+                self.body_id.to_def_id(),
             ) {
                 self.set_tainted_by_errors(e);
             }
diff --git a/compiler/rustc_middle/src/middle/exported_symbols.rs b/compiler/rustc_middle/src/middle/exported_symbols.rs
index e30b6b203d73c..123b32f4aea94 100644
--- a/compiler/rustc_middle/src/middle/exported_symbols.rs
+++ b/compiler/rustc_middle/src/middle/exported_symbols.rs
@@ -43,6 +43,7 @@ pub enum ExportedSymbol<'tcx> {
     NonGeneric(DefId),
     Generic(DefId, GenericArgsRef<'tcx>),
     DropGlue(Ty<'tcx>),
+    AsyncDropGlueCtorShim(Ty<'tcx>),
     ThreadLocalShim(DefId),
     NoDefId(ty::SymbolName<'tcx>),
 }
@@ -59,6 +60,9 @@ impl<'tcx> ExportedSymbol<'tcx> {
             ExportedSymbol::DropGlue(ty) => {
                 tcx.symbol_name(ty::Instance::resolve_drop_in_place(tcx, ty))
             }
+            ExportedSymbol::AsyncDropGlueCtorShim(ty) => {
+                tcx.symbol_name(ty::Instance::resolve_async_drop_in_place(tcx, ty))
+            }
             ExportedSymbol::ThreadLocalShim(def_id) => tcx.symbol_name(ty::Instance {
                 def: ty::InstanceDef::ThreadLocalShim(def_id),
                 args: ty::GenericArgs::empty(),
diff --git a/compiler/rustc_middle/src/mir/mono.rs b/compiler/rustc_middle/src/mir/mono.rs
index 43e1318a75ab1..9eed701978232 100644
--- a/compiler/rustc_middle/src/mir/mono.rs
+++ b/compiler/rustc_middle/src/mir/mono.rs
@@ -65,7 +65,9 @@ impl<'tcx> MonoItem<'tcx> {
                 match instance.def {
                     // "Normal" functions size estimate: the number of
                     // statements, plus one for the terminator.
-                    InstanceDef::Item(..) | InstanceDef::DropGlue(..) => {
+                    InstanceDef::Item(..)
+                    | InstanceDef::DropGlue(..)
+                    | InstanceDef::AsyncDropGlueCtorShim(..) => {
                         let mir = tcx.instance_mir(instance.def);
                         mir.basic_blocks.iter().map(|bb| bb.statements.len() + 1).sum()
                     }
@@ -406,7 +408,8 @@ impl<'tcx> CodegenUnit<'tcx> {
                             | InstanceDef::DropGlue(..)
                             | InstanceDef::CloneShim(..)
                             | InstanceDef::ThreadLocalShim(..)
-                            | InstanceDef::FnPtrAddrShim(..) => None,
+                            | InstanceDef::FnPtrAddrShim(..)
+                            | InstanceDef::AsyncDropGlueCtorShim(..) => None,
                         }
                     }
                     MonoItem::Static(def_id) => def_id.as_local().map(Idx::index),
diff --git a/compiler/rustc_middle/src/mir/pretty.rs b/compiler/rustc_middle/src/mir/pretty.rs
index 15bd5c089652c..151170d78f491 100644
--- a/compiler/rustc_middle/src/mir/pretty.rs
+++ b/compiler/rustc_middle/src/mir/pretty.rs
@@ -187,6 +187,17 @@ fn dump_path<'tcx>(
             }));
             s
         }
+        ty::InstanceDef::AsyncDropGlueCtorShim(_, Some(ty)) => {
+            // Unfortunately, pretty-printed typed are not very filename-friendly.
+            // We dome some filtering.
+            let mut s = ".".to_owned();
+            s.extend(ty.to_string().chars().filter_map(|c| match c {
+                ' ' => None,
+                ':' | '<' | '>' => Some('_'),
+                c => Some(c),
+            }));
+            s
+        }
         _ => String::new(),
     };
 
diff --git a/compiler/rustc_middle/src/mir/visit.rs b/compiler/rustc_middle/src/mir/visit.rs
index 4f7b2f7cbe48b..1c38a69c7ec2d 100644
--- a/compiler/rustc_middle/src/mir/visit.rs
+++ b/compiler/rustc_middle/src/mir/visit.rs
@@ -350,12 +350,14 @@ macro_rules! make_mir_visitor {
                             receiver_by_ref: _,
                         } |
                         ty::InstanceDef::CoroutineKindShim { coroutine_def_id: _def_id } |
+                        ty::InstanceDef::AsyncDropGlueCtorShim(_def_id, None) |
                         ty::InstanceDef::DropGlue(_def_id, None) => {}
 
                         ty::InstanceDef::FnPtrShim(_def_id, ty) |
                         ty::InstanceDef::DropGlue(_def_id, Some(ty)) |
                         ty::InstanceDef::CloneShim(_def_id, ty) |
-                        ty::InstanceDef::FnPtrAddrShim(_def_id, ty) => {
+                        ty::InstanceDef::FnPtrAddrShim(_def_id, ty) |
+                        ty::InstanceDef::AsyncDropGlueCtorShim(_def_id, Some(ty)) => {
                             // FIXME(eddyb) use a better `TyContext` here.
                             self.visit_ty($(& $mutability)? *ty, TyContext::Location(location));
                         }
diff --git a/compiler/rustc_middle/src/query/mod.rs b/compiler/rustc_middle/src/query/mod.rs
index 0d625ff0faeb5..4ae79399ef483 100644
--- a/compiler/rustc_middle/src/query/mod.rs
+++ b/compiler/rustc_middle/src/query/mod.rs
@@ -1344,6 +1344,14 @@ rustc_queries! {
     query is_unpin_raw(env: ty::ParamEnvAnd<'tcx, Ty<'tcx>>) -> bool {
         desc { "computing whether `{}` is `Unpin`", env.value }
     }
+    /// Query backing `Ty::has_surface_async_drop`.
+    query has_surface_async_drop_raw(env: ty::ParamEnvAnd<'tcx, Ty<'tcx>>) -> bool {
+        desc { "computing whether `{}` has `AsyncDrop` implementation", env.value }
+    }
+    /// Query backing `Ty::has_surface_drop`.
+    query has_surface_drop_raw(env: ty::ParamEnvAnd<'tcx, Ty<'tcx>>) -> bool {
+        desc { "computing whether `{}` has `Drop` implementation", env.value }
+    }
     /// Query backing `Ty::needs_drop`.
     query needs_drop_raw(env: ty::ParamEnvAnd<'tcx, Ty<'tcx>>) -> bool {
         desc { "computing whether `{}` needs drop", env.value }
diff --git a/compiler/rustc_middle/src/ty/instance.rs b/compiler/rustc_middle/src/ty/instance.rs
index f8f59fbeab449..4002d0da79095 100644
--- a/compiler/rustc_middle/src/ty/instance.rs
+++ b/compiler/rustc_middle/src/ty/instance.rs
@@ -168,6 +168,12 @@ pub enum InstanceDef<'tcx> {
     ///
     /// The `DefId` is for `FnPtr::addr`, the `Ty` is the type `T`.
     FnPtrAddrShim(DefId, Ty<'tcx>),
+
+    /// `core::future::async_drop::async_drop_in_place::<'_, T>`.
+    ///
+    /// The `DefId` is for `core::future::async_drop::async_drop_in_place`, the `Ty`
+    /// is the type `T`.
+    AsyncDropGlueCtorShim(DefId, Option<Ty<'tcx>>),
 }
 
 impl<'tcx> Instance<'tcx> {
@@ -210,7 +216,9 @@ impl<'tcx> Instance<'tcx> {
             InstanceDef::Item(def) => tcx
                 .upstream_monomorphizations_for(def)
                 .and_then(|monos| monos.get(&self.args).cloned()),
-            InstanceDef::DropGlue(_, Some(_)) => tcx.upstream_drop_glue_for(self.args),
+            InstanceDef::DropGlue(_, Some(_)) | InstanceDef::AsyncDropGlueCtorShim(_, _) => {
+                tcx.upstream_drop_glue_for(self.args)
+            }
             _ => None,
         }
     }
@@ -235,7 +243,8 @@ impl<'tcx> InstanceDef<'tcx> {
             | ty::InstanceDef::CoroutineKindShim { coroutine_def_id: def_id }
             | InstanceDef::DropGlue(def_id, _)
             | InstanceDef::CloneShim(def_id, _)
-            | InstanceDef::FnPtrAddrShim(def_id, _) => def_id,
+            | InstanceDef::FnPtrAddrShim(def_id, _)
+            | InstanceDef::AsyncDropGlueCtorShim(def_id, _) => def_id,
         }
     }
 
@@ -243,9 +252,9 @@ impl<'tcx> InstanceDef<'tcx> {
     pub fn def_id_if_not_guaranteed_local_codegen(self) -> Option<DefId> {
         match self {
             ty::InstanceDef::Item(def) => Some(def),
-            ty::InstanceDef::DropGlue(def_id, Some(_)) | InstanceDef::ThreadLocalShim(def_id) => {
-                Some(def_id)
-            }
+            ty::InstanceDef::DropGlue(def_id, Some(_))
+            | InstanceDef::AsyncDropGlueCtorShim(def_id, _)
+            | InstanceDef::ThreadLocalShim(def_id) => Some(def_id),
             InstanceDef::VTableShim(..)
             | InstanceDef::ReifyShim(..)
             | InstanceDef::FnPtrShim(..)
@@ -279,6 +288,7 @@ impl<'tcx> InstanceDef<'tcx> {
         let def_id = match *self {
             ty::InstanceDef::Item(def) => def,
             ty::InstanceDef::DropGlue(_, Some(_)) => return false,
+            ty::InstanceDef::AsyncDropGlueCtorShim(_, Some(_)) => return false,
             ty::InstanceDef::ThreadLocalShim(_) => return false,
             _ => return true,
         };
@@ -347,11 +357,13 @@ impl<'tcx> InstanceDef<'tcx> {
             | InstanceDef::ThreadLocalShim(..)
             | InstanceDef::FnPtrAddrShim(..)
             | InstanceDef::FnPtrShim(..)
-            | InstanceDef::DropGlue(_, Some(_)) => false,
+            | InstanceDef::DropGlue(_, Some(_))
+            | InstanceDef::AsyncDropGlueCtorShim(_, Some(_)) => false,
             InstanceDef::ClosureOnceShim { .. }
             | InstanceDef::ConstructCoroutineInClosureShim { .. }
             | InstanceDef::CoroutineKindShim { .. }
             | InstanceDef::DropGlue(..)
+            | InstanceDef::AsyncDropGlueCtorShim(..)
             | InstanceDef::Item(_)
             | InstanceDef::Intrinsic(..)
             | InstanceDef::ReifyShim(..)
@@ -396,6 +408,8 @@ fn fmt_instance(
         InstanceDef::DropGlue(_, Some(ty)) => write!(f, " - shim(Some({ty}))"),
         InstanceDef::CloneShim(_, ty) => write!(f, " - shim({ty})"),
         InstanceDef::FnPtrAddrShim(_, ty) => write!(f, " - shim({ty})"),
+        InstanceDef::AsyncDropGlueCtorShim(_, None) => write!(f, " - shim(None)"),
+        InstanceDef::AsyncDropGlueCtorShim(_, Some(ty)) => write!(f, " - shim(Some({ty}))"),
     }
 }
 
@@ -638,6 +652,12 @@ impl<'tcx> Instance<'tcx> {
         Instance::expect_resolve(tcx, ty::ParamEnv::reveal_all(), def_id, args)
     }
 
+    pub fn resolve_async_drop_in_place(tcx: TyCtxt<'tcx>, ty: Ty<'tcx>) -> ty::Instance<'tcx> {
+        let def_id = tcx.require_lang_item(LangItem::AsyncDropInPlace, None);
+        let args = tcx.mk_args(&[ty.into()]);
+        Instance::expect_resolve(tcx, ty::ParamEnv::reveal_all(), def_id, args)
+    }
+
     #[instrument(level = "debug", skip(tcx), ret)]
     pub fn fn_once_adapter_instance(
         tcx: TyCtxt<'tcx>,
diff --git a/compiler/rustc_middle/src/ty/mod.rs b/compiler/rustc_middle/src/ty/mod.rs
index e6b773ae512d1..917d5bd774b8e 100644
--- a/compiler/rustc_middle/src/ty/mod.rs
+++ b/compiler/rustc_middle/src/ty/mod.rs
@@ -1797,7 +1797,8 @@ impl<'tcx> TyCtxt<'tcx> {
             | ty::InstanceDef::DropGlue(..)
             | ty::InstanceDef::CloneShim(..)
             | ty::InstanceDef::ThreadLocalShim(..)
-            | ty::InstanceDef::FnPtrAddrShim(..) => self.mir_shims(instance),
+            | ty::InstanceDef::FnPtrAddrShim(..)
+            | ty::InstanceDef::AsyncDropGlueCtorShim(..) => self.mir_shims(instance),
         }
     }
 
diff --git a/compiler/rustc_middle/src/ty/sty.rs b/compiler/rustc_middle/src/ty/sty.rs
index ad64745d579a0..2a2781655c4be 100644
--- a/compiler/rustc_middle/src/ty/sty.rs
+++ b/compiler/rustc_middle/src/ty/sty.rs
@@ -24,6 +24,7 @@ use rustc_target::abi::{FieldIdx, VariantIdx, FIRST_VARIANT};
 use rustc_target::spec::abi::{self, Abi};
 use std::assert_matches::debug_assert_matches;
 use std::borrow::Cow;
+use std::iter;
 use std::ops::{ControlFlow, Deref, Range};
 use ty::util::IntTypeExt;
 
@@ -2316,6 +2317,133 @@ impl<'tcx> Ty<'tcx> {
         }
     }
 
+    /// Returns the type of the async destructor of this type.
+    pub fn async_destructor_ty(self, tcx: TyCtxt<'tcx>, param_env: ParamEnv<'tcx>) -> Ty<'tcx> {
+        if self.is_async_destructor_noop(tcx, param_env) || matches!(self.kind(), ty::Error(_)) {
+            return Ty::async_destructor_combinator(tcx, LangItem::AsyncDropNoop)
+                .instantiate_identity();
+        }
+        match *self.kind() {
+            ty::Param(_) | ty::Alias(..) | ty::Infer(ty::TyVar(_)) => {
+                let assoc_items = tcx
+                    .associated_item_def_ids(tcx.require_lang_item(LangItem::AsyncDestruct, None));
+                Ty::new_projection(tcx, assoc_items[0], [self])
+            }
+
+            ty::Array(elem_ty, _) | ty::Slice(elem_ty) => {
+                let dtor = Ty::async_destructor_combinator(tcx, LangItem::AsyncDropSlice)
+                    .instantiate(tcx, &[elem_ty.into()]);
+                Ty::async_destructor_combinator(tcx, LangItem::AsyncDropFuse)
+                    .instantiate(tcx, &[dtor.into()])
+            }
+
+            ty::Adt(adt_def, args) if adt_def.is_enum() || adt_def.is_struct() => self
+                .adt_async_destructor_ty(
+                    tcx,
+                    adt_def.variants().iter().map(|v| v.fields.iter().map(|f| f.ty(tcx, args))),
+                    param_env,
+                ),
+            ty::Tuple(tys) => self.adt_async_destructor_ty(tcx, iter::once(tys), param_env),
+            ty::Closure(_, args) => self.adt_async_destructor_ty(
+                tcx,
+                iter::once(args.as_closure().upvar_tys()),
+                param_env,
+            ),
+            ty::CoroutineClosure(_, args) => self.adt_async_destructor_ty(
+                tcx,
+                iter::once(args.as_coroutine_closure().upvar_tys()),
+                param_env,
+            ),
+
+            ty::Adt(adt_def, _) => {
+                assert!(adt_def.is_union());
+
+                let surface_drop = self.surface_async_dropper_ty(tcx, param_env).unwrap();
+
+                Ty::async_destructor_combinator(tcx, LangItem::AsyncDropFuse)
+                    .instantiate(tcx, &[surface_drop.into()])
+            }
+
+            ty::Bound(..)
+            | ty::Foreign(_)
+            | ty::Placeholder(_)
+            | ty::Infer(ty::FreshTy(_) | ty::FreshIntTy(_) | ty::FreshFloatTy(_)) => {
+                bug!("`async_destructor_ty` applied to unexpected type: {self:?}")
+            }
+
+            _ => bug!("`async_destructor_ty` is not yet implemented for type: {self:?}"),
+        }
+    }
+
+    fn adt_async_destructor_ty<I>(
+        self,
+        tcx: TyCtxt<'tcx>,
+        variants: I,
+        param_env: ParamEnv<'tcx>,
+    ) -> Ty<'tcx>
+    where
+        I: Iterator + ExactSizeIterator,
+        I::Item: IntoIterator<Item = Ty<'tcx>>,
+    {
+        debug_assert!(!self.is_async_destructor_noop(tcx, param_env));
+
+        let defer = Ty::async_destructor_combinator(tcx, LangItem::AsyncDropDefer);
+        let chain = Ty::async_destructor_combinator(tcx, LangItem::AsyncDropChain);
+
+        let noop =
+            Ty::async_destructor_combinator(tcx, LangItem::AsyncDropNoop).instantiate_identity();
+        let either = Ty::async_destructor_combinator(tcx, LangItem::AsyncDropEither);
+
+        let variants_dtor = variants
+            .into_iter()
+            .map(|variant| {
+                variant
+                    .into_iter()
+                    .map(|ty| defer.instantiate(tcx, &[ty.into()]))
+                    .reduce(|acc, next| chain.instantiate(tcx, &[acc.into(), next.into()]))
+                    .unwrap_or(noop)
+            })
+            .reduce(|other, matched| {
+                either.instantiate(tcx, &[other.into(), matched.into(), self.into()])
+            })
+            .unwrap();
+
+        let dtor = if let Some(dropper_ty) = self.surface_async_dropper_ty(tcx, param_env) {
+            Ty::async_destructor_combinator(tcx, LangItem::AsyncDropChain)
+                .instantiate(tcx, &[dropper_ty.into(), variants_dtor.into()])
+        } else {
+            variants_dtor
+        };
+
+        Ty::async_destructor_combinator(tcx, LangItem::AsyncDropFuse)
+            .instantiate(tcx, &[dtor.into()])
+    }
+
+    fn surface_async_dropper_ty(
+        self,
+        tcx: TyCtxt<'tcx>,
+        param_env: ParamEnv<'tcx>,
+    ) -> Option<Ty<'tcx>> {
+        if self.has_surface_async_drop(tcx, param_env) {
+            Some(LangItem::SurfaceAsyncDropInPlace)
+        } else if self.has_surface_drop(tcx, param_env) {
+            Some(LangItem::AsyncDropSurfaceDropInPlace)
+        } else {
+            None
+        }
+        .map(|dropper| {
+            Ty::async_destructor_combinator(tcx, dropper).instantiate(tcx, &[self.into()])
+        })
+    }
+
+    fn async_destructor_combinator(
+        tcx: TyCtxt<'tcx>,
+        lang_item: LangItem,
+    ) -> ty::EarlyBinder<Ty<'tcx>> {
+        tcx.fn_sig(tcx.require_lang_item(lang_item, None))
+            .map_bound(|fn_sig| fn_sig.output().no_bound_vars().unwrap())
+    }
+
     /// Returns the type of metadata for (potentially fat) pointers to this type,
     /// or the struct tail if the metadata type cannot be determined.
     pub fn ptr_metadata_ty_or_tail(
diff --git a/compiler/rustc_middle/src/ty/util.rs b/compiler/rustc_middle/src/ty/util.rs
index 9af665cfb6fe0..542014dcce7e1 100644
--- a/compiler/rustc_middle/src/ty/util.rs
+++ b/compiler/rustc_middle/src/ty/util.rs
@@ -1303,6 +1303,98 @@ impl<'tcx> Ty<'tcx> {
         }
     }
 
+    /// Checks whether values of this type `T` implements the `AsyncDrop`
+    /// trait.
+    pub fn has_surface_async_drop(self, tcx: TyCtxt<'tcx>, param_env: ty::ParamEnv<'tcx>) -> bool {
+        self.could_have_surface_async_drop() && tcx.has_surface_async_drop_raw(param_env.and(self))
+    }
+
+    /// Fast path helper for testing if a type has `AsyncDrop`
+    /// implementation.
+    ///
+    /// Returning `false` means the type is known to not have `AsyncDrop`
+    /// implementation. Returning `true` means nothing -- could be
+    /// `AsyncDrop`, might not be.
+    fn could_have_surface_async_drop(self) -> bool {
+        !self.is_async_destructor_trivially_noop()
+            && !matches!(
+                self.kind(),
+                ty::Tuple(_)
+                    | ty::Slice(_)
+                    | ty::Array(_, _)
+                    | ty::Closure(..)
+                    | ty::CoroutineClosure(..)
+                    | ty::Coroutine(..)
+            )
+    }
+
+    /// Checks whether values of this type `T` implements the `Drop`
+    /// trait.
+    pub fn has_surface_drop(self, tcx: TyCtxt<'tcx>, param_env: ty::ParamEnv<'tcx>) -> bool {
+        self.could_have_surface_drop() && tcx.has_surface_drop_raw(param_env.and(self))
+    }
+
+    /// Fast path helper for testing if a type has `Drop` implementation.
+    ///
+    /// Returning `false` means the type is known to not have `Drop`
+    /// implementation. Returning `true` means nothing -- could be
+    /// `Drop`, might not be.
+    fn could_have_surface_drop(self) -> bool {
+        !self.is_async_destructor_trivially_noop()
+            && !matches!(
+                self.kind(),
+                ty::Tuple(_)
+                    | ty::Slice(_)
+                    | ty::Array(_, _)
+                    | ty::Closure(..)
+                    | ty::CoroutineClosure(..)
+                    | ty::Coroutine(..)
+            )
+    }
+
+    /// Checks whether values of this type `T` implement has noop async destructor.
+    //
+    // FIXME: implement optimization to make ADTs, which do not need drop,
+    // to skip fields or to have noop async destructor.
+    pub fn is_async_destructor_noop(
+        self,
+        tcx: TyCtxt<'tcx>,
+        param_env: ty::ParamEnv<'tcx>,
+    ) -> bool {
+        self.is_async_destructor_trivially_noop()
+            || if let ty::Adt(adt_def, _) = self.kind() {
+                (adt_def.is_union() || adt_def.is_payloadfree())
+                    && !self.has_surface_async_drop(tcx, param_env)
+                    && !self.has_surface_drop(tcx, param_env)
+            } else {
+                false
+            }
+    }
+
+    /// Fast path helper for testing if a type has noop async destructor.
+    ///
+    /// Returning `true` means the type is known to have noop async destructor
+    /// implementation. Returning `true` means nothing -- could be
+    /// `Drop`, might not be.
+    fn is_async_destructor_trivially_noop(self) -> bool {
+        match self.kind() {
+            ty::Int(_)
+            | ty::Uint(_)
+            | ty::Float(_)
+            | ty::Bool
+            | ty::Char
+            | ty::Str
+            | ty::Never
+            | ty::Ref(..)
+            | ty::RawPtr(..)
+            | ty::FnDef(..)
+            | ty::FnPtr(_) => true,
+            ty::Tuple(tys) => tys.is_empty(),
+            ty::Adt(adt_def, _) => adt_def.is_manually_drop(),
+            _ => false,
+        }
+    }
+
     /// If `ty.needs_drop(...)` returns `true`, then `ty` is definitely
     /// non-copy and *might* have a destructor attached; if it returns
     /// `false`, then `ty` definitely has no destructor (i.e., no drop glue).
diff --git a/compiler/rustc_mir_transform/src/inline.rs b/compiler/rustc_mir_transform/src/inline.rs
index 60513a674af95..1af03a5afe557 100644
--- a/compiler/rustc_mir_transform/src/inline.rs
+++ b/compiler/rustc_mir_transform/src/inline.rs
@@ -332,7 +332,8 @@ impl<'tcx> Inliner<'tcx> {
             | InstanceDef::DropGlue(..)
             | InstanceDef::CloneShim(..)
             | InstanceDef::ThreadLocalShim(..)
-            | InstanceDef::FnPtrAddrShim(..) => return Ok(()),
+            | InstanceDef::FnPtrAddrShim(..)
+            | InstanceDef::AsyncDropGlueCtorShim(..) => return Ok(()),
         }
 
         if self.tcx.is_constructor(callee_def_id) {
@@ -1071,7 +1072,8 @@ fn try_instance_mir<'tcx>(
     tcx: TyCtxt<'tcx>,
     instance: InstanceDef<'tcx>,
 ) -> Result<&'tcx Body<'tcx>, &'static str> {
-    if let ty::InstanceDef::DropGlue(_, Some(ty)) = instance
+    if let ty::InstanceDef::DropGlue(_, Some(ty))
+    | ty::InstanceDef::AsyncDropGlueCtorShim(_, Some(ty)) = instance
         && let ty::Adt(def, args) = ty.kind()
     {
         let fields = def.all_fields();
diff --git a/compiler/rustc_mir_transform/src/inline/cycle.rs b/compiler/rustc_mir_transform/src/inline/cycle.rs
index 99c7b616f1b21..8c5f965108bdc 100644
--- a/compiler/rustc_mir_transform/src/inline/cycle.rs
+++ b/compiler/rustc_mir_transform/src/inline/cycle.rs
@@ -94,8 +94,10 @@ pub(crate) fn mir_callgraph_reachable<'tcx>(
                 | InstanceDef::CloneShim(..) => {}
 
                 // This shim does not call any other functions, thus there can be no recursion.
-                InstanceDef::FnPtrAddrShim(..) => continue,
-                InstanceDef::DropGlue(..) => {
+                InstanceDef::FnPtrAddrShim(..) => {
+                    continue;
+                }
+                InstanceDef::DropGlue(..) | InstanceDef::AsyncDropGlueCtorShim(..) => {
                     // FIXME: A not fully instantiated drop shim can cause ICEs if one attempts to
                     // have its MIR built. Likely oli-obk just screwed up the `ParamEnv`s, so this
                     // needs some more analysis.
diff --git a/compiler/rustc_mir_transform/src/shim.rs b/compiler/rustc_mir_transform/src/shim.rs
index fa6906bdd554f..1c85a604053c7 100644
--- a/compiler/rustc_mir_transform/src/shim.rs
+++ b/compiler/rustc_mir_transform/src/shim.rs
@@ -22,6 +22,8 @@ use crate::{
 use rustc_middle::mir::patch::MirPatch;
 use rustc_mir_dataflow::elaborate_drops::{self, DropElaborator, DropFlagMode, DropStyle};
 
+mod async_destructor_ctor;
+
 pub fn provide(providers: &mut Providers) {
     providers.mir_shims = make_shim;
 }
@@ -127,6 +129,9 @@ fn make_shim<'tcx>(tcx: TyCtxt<'tcx>, instance: ty::InstanceDef<'tcx>) -> Body<'
         ty::InstanceDef::ThreadLocalShim(..) => build_thread_local_shim(tcx, instance),
         ty::InstanceDef::CloneShim(def_id, ty) => build_clone_shim(tcx, def_id, ty),
         ty::InstanceDef::FnPtrAddrShim(def_id, ty) => build_fn_ptr_addr_shim(tcx, def_id, ty),
+        ty::InstanceDef::AsyncDropGlueCtorShim(def_id, ty) => {
+            async_destructor_ctor::build_async_destructor_ctor_shim(tcx, def_id, ty)
+        }
         ty::InstanceDef::Virtual(..) => {
             bug!("InstanceDef::Virtual ({:?}) is for direct calls only", instance)
         }
diff --git a/compiler/rustc_mir_transform/src/shim/async_destructor_ctor.rs b/compiler/rustc_mir_transform/src/shim/async_destructor_ctor.rs
new file mode 100644
index 0000000000000..80eadb9abdcf4
--- /dev/null
+++ b/compiler/rustc_mir_transform/src/shim/async_destructor_ctor.rs
@@ -0,0 +1,618 @@
+use std::iter;
+
+use itertools::Itertools;
+use rustc_ast::Mutability;
+use rustc_const_eval::interpret;
+use rustc_hir::def_id::DefId;
+use rustc_hir::lang_items::LangItem;
+use rustc_index::{Idx, IndexVec};
+use rustc_middle::mir::{
+    BasicBlock, BasicBlockData, Body, CallSource, CastKind, Const, ConstOperand, ConstValue, Local,
+    LocalDecl, MirSource, Operand, Place, PlaceElem, Rvalue, SourceInfo, Statement, StatementKind,
+    Terminator, TerminatorKind, UnwindAction, UnwindTerminateReason, RETURN_PLACE,
+};
+use rustc_middle::ty::adjustment::PointerCoercion;
+use rustc_middle::ty::util::Discr;
+use rustc_middle::ty::{self, Ty, TyCtxt};
+use rustc_span::source_map::respan;
+use rustc_span::{Span, Symbol};
+use rustc_target::abi::{FieldIdx, VariantIdx};
+use rustc_target::spec::PanicStrategy;
+
+use super::{local_decls_for_sig, new_body};
+
+pub fn build_async_destructor_ctor_shim<'tcx>(
+    tcx: TyCtxt<'tcx>,
+    def_id: DefId,
+    ty: Option<Ty<'tcx>>,
+) -> Body<'tcx> {
+    debug!("build_drop_shim(def_id={:?}, ty={:?})", def_id, ty);
+
+    AsyncDestructorCtorShimBuilder::new(tcx, def_id, ty).build()
+}
+
+/// Builder for async_drop_in_place shim. Functions as a stack machine
+/// to build up an expression using combinators. Stack contains pairs
+/// of locals and types. Combinator is a not yet instantiated pair of a
+/// function and a type, is considered to be an operator which consumes
+/// operands from the stack by instantiating its function and its type
+/// with operand types and moving locals into the function call. Top
+/// pair is considered to be the last operand.
+// FIXME: add mir-opt tests
+struct AsyncDestructorCtorShimBuilder<'tcx> {
+    tcx: TyCtxt<'tcx>,
+    def_id: DefId,
+    self_ty: Option<Ty<'tcx>>,
+    span: Span,
+    source_info: SourceInfo,
+    param_env: ty::ParamEnv<'tcx>,
+
+    stack: Vec<Operand<'tcx>>,
+    last_bb: BasicBlock,
+    top_cleanup_bb: Option<BasicBlock>,
+
+    locals: IndexVec<Local, LocalDecl<'tcx>>,
+    bbs: IndexVec<BasicBlock, BasicBlockData<'tcx>>,
+}
+
+#[derive(Clone, Copy)]
+enum SurfaceDropKind {
+    Async,
+    Sync,
+}
+
+impl<'tcx> AsyncDestructorCtorShimBuilder<'tcx> {
+    const SELF_PTR: Local = Local::from_u32(1);
+    const INPUT_COUNT: usize = 1;
+    const MAX_STACK_LEN: usize = 2;
+
+    fn new(tcx: TyCtxt<'tcx>, def_id: DefId, self_ty: Option<Ty<'tcx>>) -> Self {
+        let args = if let Some(ty) = self_ty {
+            tcx.mk_args(&[ty.into()])
+        } else {
+            ty::GenericArgs::identity_for_item(tcx, def_id)
+        };
+        let sig = tcx.fn_sig(def_id).instantiate(tcx, args);
+        let sig = tcx.instantiate_bound_regions_with_erased(sig);
+        let span = tcx.def_span(def_id);
+
+        let source_info = SourceInfo::outermost(span);
+
+        debug_assert_eq!(sig.inputs().len(), Self::INPUT_COUNT);
+        let locals = local_decls_for_sig(&sig, span);
+
+        // Usual case: noop() + unwind resume + return
+        let mut bbs = IndexVec::with_capacity(3);
+        let param_env = tcx.param_env_reveal_all_normalized(def_id);
+        AsyncDestructorCtorShimBuilder {
+            tcx,
+            def_id,
+            self_ty,
+            span,
+            source_info,
+            param_env,
+
+            stack: Vec::with_capacity(Self::MAX_STACK_LEN),
+            last_bb: bbs.push(BasicBlockData::new(None)),
+            top_cleanup_bb: match tcx.sess.panic_strategy() {
+                PanicStrategy::Unwind => {
+                    // Don't drop input arg because it's just a pointer
+                    Some(bbs.push(BasicBlockData {
+                        statements: Vec::new(),
+                        terminator: Some(Terminator {
+                            source_info,
+                            kind: TerminatorKind::UnwindResume,
+                        }),
+                        is_cleanup: true,
+                    }))
+                }
+                PanicStrategy::Abort => None,
+            },
+
+            locals,
+            bbs,
+        }
+    }
+
+    fn build(self) -> Body<'tcx> {
+        let (tcx, def_id, Some(self_ty)) = (self.tcx, self.def_id, self.self_ty) else {
+            return self.build_zst_output();
+        };
+
+        let surface_drop_kind = || {
+            let param_env = tcx.param_env_reveal_all_normalized(def_id);
+            if self_ty.has_surface_async_drop(tcx, param_env) {
+                Some(SurfaceDropKind::Async)
+            } else if self_ty.has_surface_drop(tcx, param_env) {
+                Some(SurfaceDropKind::Sync)
+            } else {
+                None
+            }
+        };
+
+        match self_ty.kind() {
+            ty::Array(elem_ty, _) => self.build_slice(true, *elem_ty),
+            ty::Slice(elem_ty) => self.build_slice(false, *elem_ty),
+
+            ty::Tuple(elem_tys) => self.build_chain(None, elem_tys.iter()),
+            ty::Adt(adt_def, args) if adt_def.is_struct() => {
+                let field_tys = adt_def.non_enum_variant().fields.iter().map(|f| f.ty(tcx, args));
+                self.build_chain(surface_drop_kind(), field_tys)
+            }
+            ty::Closure(_, args) => self.build_chain(None, args.as_closure().upvar_tys().iter()),
+            ty::CoroutineClosure(_, args) => {
+                self.build_chain(None, args.as_coroutine_closure().upvar_tys().iter())
+            }
+
+            ty::Adt(adt_def, args) if adt_def.is_enum() => {
+                self.build_enum(*adt_def, *args, surface_drop_kind())
+            }
+
+            ty::Adt(adt_def, _) => {
+                assert!(adt_def.is_union());
+                match surface_drop_kind().unwrap() {
+                    SurfaceDropKind::Async => self.build_fused_async_surface(),
+                    SurfaceDropKind::Sync => self.build_fused_sync_surface(),
+                }
+            }
+
+            ty::Bound(..)
+            | ty::Foreign(_)
+            | ty::Placeholder(_)
+            | ty::Infer(ty::FreshTy(_) | ty::FreshIntTy(_) | ty::FreshFloatTy(_) | ty::TyVar(_))
+            | ty::Param(_)
+            | ty::Alias(..) => {
+                bug!("Building async destructor for unexpected type: {self_ty:?}")
+            }
+
+            _ => {
+                bug!(
+                    "Building async destructor constructor shim is not yet implemented for type: {self_ty:?}"
+                )
+            }
+        }
+    }
+
+    fn build_enum(
+        mut self,
+        adt_def: ty::AdtDef<'tcx>,
+        args: ty::GenericArgsRef<'tcx>,
+        surface_drop: Option<SurfaceDropKind>,
+    ) -> Body<'tcx> {
+        let tcx = self.tcx;
+
+        let surface = match surface_drop {
+            None => None,
+            Some(kind) => {
+                self.put_self();
+                Some(match kind {
+                    SurfaceDropKind::Async => self.combine_async_surface(),
+                    SurfaceDropKind::Sync => self.combine_sync_surface(),
+                })
+            }
+        };
+
+        let mut other = None;
+        for (variant_idx, discr) in adt_def.discriminants(tcx) {
+            let variant = adt_def.variant(variant_idx);
+
+            let mut chain = None;
+            for (field_idx, field) in variant.fields.iter_enumerated() {
+                let field_ty = field.ty(tcx, args);
+                self.put_variant_field(variant.name, variant_idx, field_idx, field_ty);
+                let defer = self.combine_defer(field_ty);
+                chain = Some(match chain {
+                    None => defer,
+                    Some(chain) => self.combine_chain(chain, defer),
+                })
+            }
+            let variant_dtor = chain.unwrap_or_else(|| self.put_noop());
+
+            other = Some(match other {
+                None => variant_dtor,
+                Some(other) => {
+                    self.put_self();
+                    self.put_discr(discr);
+                    self.combine_either(other, variant_dtor)
+                }
+            });
+        }
+        let variants_dtor = other.unwrap_or_else(|| self.put_noop());
+
+        let dtor = match surface {
+            None => variants_dtor,
+            Some(surface) => self.combine_chain(surface, variants_dtor),
+        };
+        self.combine_fuse(dtor);
+        self.return_()
+    }
+
+    fn build_chain<I>(mut self, surface_drop: Option<SurfaceDropKind>, elem_tys: I) -> Body<'tcx>
+    where
+        I: Iterator<Item = Ty<'tcx>> + ExactSizeIterator,
+    {
+        let surface = match surface_drop {
+            None => None,
+            Some(kind) => {
+                self.put_self();
+                Some(match kind {
+                    SurfaceDropKind::Async => self.combine_async_surface(),
+                    SurfaceDropKind::Sync => self.combine_sync_surface(),
+                })
+            }
+        };
+
+        let mut chain = None;
+        for (field_idx, field_ty) in elem_tys.enumerate().map(|(i, ty)| (FieldIdx::new(i), ty)) {
+            self.put_field(field_idx, field_ty);
+            let defer = self.combine_defer(field_ty);
+            chain = Some(match chain {
+                None => defer,
+                Some(chain) => self.combine_chain(chain, defer),
+            })
+        }
+        let chain = chain.unwrap_or_else(|| self.put_noop());
+
+        let dtor = match surface {
+            None => chain,
+            Some(surface) => self.combine_chain(surface, chain),
+        };
+        self.combine_fuse(dtor);
+        self.return_()
+    }
+
+    fn build_zst_output(mut self) -> Body<'tcx> {
+        self.put_zst_output();
+        self.return_()
+    }
+
+    fn build_fused_async_surface(mut self) -> Body<'tcx> {
+        self.put_self();
+        let surface = self.combine_async_surface();
+        self.combine_fuse(surface);
+        self.return_()
+    }
+
+    fn build_fused_sync_surface(mut self) -> Body<'tcx> {
+        self.put_self();
+        let surface = self.combine_sync_surface();
+        self.combine_fuse(surface);
+        self.return_()
+    }
+
+    fn build_slice(mut self, is_array: bool, elem_ty: Ty<'tcx>) -> Body<'tcx> {
+        if is_array {
+            self.put_array_as_slice(elem_ty)
+        } else {
+            self.put_self()
+        }
+        let dtor = self.combine_slice(elem_ty);
+        self.combine_fuse(dtor);
+        self.return_()
+    }
+
+    fn put_zst_output(&mut self) {
+        let return_ty = self.locals[RETURN_PLACE].ty;
+        self.put_operand(Operand::Constant(Box::new(ConstOperand {
+            span: self.span,
+            user_ty: None,
+            const_: Const::zero_sized(return_ty),
+        })));
+    }
+
+    /// Puts `to_drop: *mut Self` on top of the stack.
+    fn put_self(&mut self) {
+        self.put_operand(Operand::Copy(Self::SELF_PTR.into()))
+    }
+
+    /// Given that `Self is [ElemTy; N]` puts `to_drop: *mut [ElemTy]`
+    /// on top of the stack.
+    fn put_array_as_slice(&mut self, elem_ty: Ty<'tcx>) {
+        let slice_ptr_ty = Ty::new_mut_ptr(self.tcx, Ty::new_slice(self.tcx, elem_ty));
+        self.put_temp_rvalue(Rvalue::Cast(
+            CastKind::PointerCoercion(PointerCoercion::Unsize),
+            Operand::Copy(Self::SELF_PTR.into()),
+            slice_ptr_ty,
+        ))
+    }
+
+    /// If given Self is a struct puts `to_drop: *mut FieldTy` on top
+    /// of the stack.
+    fn put_field(&mut self, field: FieldIdx, field_ty: Ty<'tcx>) {
+        let place = Place {
+            local: Self::SELF_PTR,
+            projection: self
+                .tcx
+                .mk_place_elems(&[PlaceElem::Deref, PlaceElem::Field(field, field_ty)]),
+        };
+        self.put_temp_rvalue(Rvalue::AddressOf(Mutability::Mut, place))
+    }
+
+    /// If given Self is an enum puts `to_drop: *mut FieldTy` on top of
+    /// the stack.
+    fn put_variant_field(
+        &mut self,
+        variant_sym: Symbol,
+        variant: VariantIdx,
+        field: FieldIdx,
+        field_ty: Ty<'tcx>,
+    ) {
+        let place = Place {
+            local: Self::SELF_PTR,
+            projection: self.tcx.mk_place_elems(&[
+                PlaceElem::Deref,
+                PlaceElem::Downcast(Some(variant_sym), variant),
+                PlaceElem::Field(field, field_ty),
+            ]),
+        };
+        self.put_temp_rvalue(Rvalue::AddressOf(Mutability::Mut, place))
+    }
+
+    /// If given Self is an enum puts `to_drop: *mut FieldTy` on top of
+    /// the stack.
+    fn put_discr(&mut self, discr: Discr<'tcx>) {
+        let (size, _) = discr.ty.int_size_and_signed(self.tcx);
+        self.put_operand(Operand::const_from_scalar(
+            self.tcx,
+            discr.ty,
+            interpret::Scalar::from_uint(discr.val, size),
+            self.span,
+        ));
+    }
+
+    /// Puts `x: RvalueType` on top of the stack.
+    fn put_temp_rvalue(&mut self, rvalue: Rvalue<'tcx>) {
+        let last_bb = &mut self.bbs[self.last_bb];
+        debug_assert!(last_bb.terminator.is_none());
+        let source_info = self.source_info;
+
+        let local_ty = rvalue.ty(&self.locals, self.tcx);
+        // We need to create a new local to be able to "consume" it with
+        // a combinator
+        let local = self.locals.push(LocalDecl::with_source_info(local_ty, source_info));
+        last_bb.statements.extend_from_slice(&[
+            Statement { source_info, kind: StatementKind::StorageLive(local) },
+            Statement {
+                source_info,
+                kind: StatementKind::Assign(Box::new((local.into(), rvalue))),
+            },
+        ]);
+
+        self.put_operand(Operand::Move(local.into()));
+    }
+
+    /// Puts operand on top of the stack.
+    fn put_operand(&mut self, operand: Operand<'tcx>) {
+        if let Some(top_cleanup_bb) = &mut self.top_cleanup_bb {
+            let source_info = self.source_info;
+            match &operand {
+                Operand::Copy(_) | Operand::Constant(_) => {
+                    *top_cleanup_bb = self.bbs.push(BasicBlockData {
+                        statements: Vec::new(),
+                        terminator: Some(Terminator {
+                            source_info,
+                            kind: TerminatorKind::Goto { target: *top_cleanup_bb },
+                        }),
+                        is_cleanup: true,
+                    });
+                }
+                Operand::Move(place) => {
+                    let local = place.as_local().unwrap();
+                    *top_cleanup_bb = self.bbs.push(BasicBlockData {
+                        statements: Vec::new(),
+                        terminator: Some(Terminator {
+                            source_info,
+                            kind: if self.locals[local].ty.needs_drop(self.tcx, self.param_env) {
+                                TerminatorKind::Drop {
+                                    place: local.into(),
+                                    target: *top_cleanup_bb,
+                                    unwind: UnwindAction::Terminate(
+                                        UnwindTerminateReason::InCleanup,
+                                    ),
+                                    replace: false,
+                                }
+                            } else {
+                                TerminatorKind::Goto { target: *top_cleanup_bb }
+                            },
+                        }),
+                        is_cleanup: true,
+                    });
+                }
+            };
+        }
+        self.stack.push(operand);
+    }
+
+    /// Puts `noop: async_drop::Noop` on top of the stack
+    fn put_noop(&mut self) -> Ty<'tcx> {
+        self.apply_combinator(0, LangItem::AsyncDropNoop, &[])
+    }
+
+    fn combine_async_surface(&mut self) -> Ty<'tcx> {
+        self.apply_combinator(1, LangItem::SurfaceAsyncDropInPlace, &[self.self_ty.unwrap().into()])
+    }
+
+    fn combine_sync_surface(&mut self) -> Ty<'tcx> {
+        self.apply_combinator(
+            1,
+            LangItem::AsyncDropSurfaceDropInPlace,
+            &[self.self_ty.unwrap().into()],
+        )
+    }
+
+    fn combine_fuse(&mut self, inner_future_ty: Ty<'tcx>) -> Ty<'tcx> {
+        self.apply_combinator(1, LangItem::AsyncDropFuse, &[inner_future_ty.into()])
+    }
+
+    fn combine_slice(&mut self, elem_ty: Ty<'tcx>) -> Ty<'tcx> {
+        self.apply_combinator(1, LangItem::AsyncDropSlice, &[elem_ty.into()])
+    }
+
+    fn combine_defer(&mut self, to_drop_ty: Ty<'tcx>) -> Ty<'tcx> {
+        self.apply_combinator(1, LangItem::AsyncDropDefer, &[to_drop_ty.into()])
+    }
+
+    fn combine_chain(&mut self, first: Ty<'tcx>, second: Ty<'tcx>) -> Ty<'tcx> {
+        self.apply_combinator(2, LangItem::AsyncDropChain, &[first.into(), second.into()])
+    }
+
+    fn combine_either(&mut self, other: Ty<'tcx>, matched: Ty<'tcx>) -> Ty<'tcx> {
+        self.apply_combinator(
+            4,
+            LangItem::AsyncDropEither,
+            &[other.into(), matched.into(), self.self_ty.unwrap().into()],
+        )
+    }
+
+    fn return_(mut self) -> Body<'tcx> {
+        let last_bb = &mut self.bbs[self.last_bb];
+        debug_assert!(last_bb.terminator.is_none());
+        let source_info = self.source_info;
+
+        let (1, Some(output)) = (self.stack.len(), self.stack.pop()) else {
+            span_bug!(
+                self.span,
+                "async destructor ctor shim builder finished with invalid number of stack items: expected 1 found {}",
+                self.stack.len(),
+            )
+        };
+        #[cfg(debug_assertions)]
+        if let Some(ty) = self.self_ty {
+            debug_assert_eq!(
+                output.ty(&self.locals, self.tcx),
+                ty.async_destructor_ty(self.tcx, self.param_env),
+                "output async destructor types did not match for type: {ty:?}",
+            );
+        }
+
+        let dead_storage = match &output {
+            Operand::Move(place) => Some(Statement {
+                source_info,
+                kind: StatementKind::StorageDead(place.as_local().unwrap()),
+            }),
+            _ => None,
+        };
+
+        last_bb.statements.extend(
+            iter::once(Statement {
+                source_info,
+                kind: StatementKind::Assign(Box::new((RETURN_PLACE.into(), Rvalue::Use(output)))),
+            })
+            .chain(dead_storage),
+        );
+
+        last_bb.terminator = Some(Terminator { source_info, kind: TerminatorKind::Return });
+
+        let source = MirSource::from_instance(ty::InstanceDef::AsyncDropGlueCtorShim(
+            self.def_id,
+            self.self_ty,
+        ));
+        new_body(source, self.bbs, self.locals, Self::INPUT_COUNT, self.span)
+    }
+
+    fn apply_combinator(
+        &mut self,
+        arity: usize,
+        function: LangItem,
+        args: &[ty::GenericArg<'tcx>],
+    ) -> Ty<'tcx> {
+        let function = self.tcx.require_lang_item(function, Some(self.span));
+        let operands_split = self
+            .stack
+            .len()
+            .checked_sub(arity)
+            .expect("async destructor ctor shim combinator tried to consume too many items");
+        let operands = &self.stack[operands_split..];
+
+        let func_ty = Ty::new_fn_def(self.tcx, function, args.iter().copied());
+        let func_sig = func_ty.fn_sig(self.tcx).no_bound_vars().unwrap();
+        #[cfg(debug_assertions)]
+        operands.iter().zip(func_sig.inputs()).for_each(|(operand, expected_ty)| {
+            let operand_ty = operand.ty(&self.locals, self.tcx);
+            if operand_ty == *expected_ty {
+                return;
+            }
+
+            // If projection of Discriminant then compare with `Ty::discriminant_ty`
+            if let ty::Alias(ty::AliasKind::Projection, ty::AliasTy { args, def_id, .. }) =
+                expected_ty.kind()
+                && Some(*def_id) == self.tcx.lang_items().discriminant_type()
+                && args.first().unwrap().as_type().unwrap().discriminant_ty(self.tcx) == operand_ty
+            {
+                return;
+            }
+
+            span_bug!(
+                self.span,
+                "Operand type and combinator argument type are not equal.
+    operand_ty: {:?}
+    argument_ty: {:?}
+",
+                operand_ty,
+                expected_ty
+            );
+        });
+
+        let target = self.bbs.push(BasicBlockData {
+            statements: operands
+                .iter()
+                .rev()
+                .filter_map(|o| {
+                    if let Operand::Move(Place { local, projection }) = o {
+                        assert!(projection.is_empty());
+                        Some(Statement {
+                            source_info: self.source_info,
+                            kind: StatementKind::StorageDead(*local),
+                        })
+                    } else {
+                        None
+                    }
+                })
+                .collect(),
+            terminator: None,
+            is_cleanup: false,
+        });
+
+        let dest_ty = func_sig.output();
+        let dest =
+            self.locals.push(LocalDecl::with_source_info(dest_ty, self.source_info).immutable());
+
+        let unwind = if let Some(top_cleanup_bb) = &mut self.top_cleanup_bb {
+            for _ in 0..arity {
+                *top_cleanup_bb =
+                    self.bbs[*top_cleanup_bb].terminator().successors().exactly_one().ok().unwrap();
+            }
+            UnwindAction::Cleanup(*top_cleanup_bb)
+        } else {
+            UnwindAction::Unreachable
+        };
+
+        let last_bb = &mut self.bbs[self.last_bb];
+        debug_assert!(last_bb.terminator.is_none());
+        last_bb.statements.push(Statement {
+            source_info: self.source_info,
+            kind: StatementKind::StorageLive(dest),
+        });
+        last_bb.terminator = Some(Terminator {
+            source_info: self.source_info,
+            kind: TerminatorKind::Call {
+                func: Operand::Constant(Box::new(ConstOperand {
+                    span: self.span,
+                    user_ty: None,
+                    const_: Const::Val(ConstValue::ZeroSized, func_ty),
+                })),
+                destination: dest.into(),
+                target: Some(target),
+                unwind,
+                call_source: CallSource::Misc,
+                fn_span: self.span,
+                args: self.stack.drain(operands_split..).map(|o| respan(self.span, o)).collect(),
+            },
+        });
+
+        self.put_operand(Operand::Move(dest.into()));
+        self.last_bb = target;
+
+        dest_ty
+    }
+}
diff --git a/compiler/rustc_monomorphize/src/collector.rs b/compiler/rustc_monomorphize/src/collector.rs
index 36d623fd93e12..a8fa6fe002dbd 100644
--- a/compiler/rustc_monomorphize/src/collector.rs
+++ b/compiler/rustc_monomorphize/src/collector.rs
@@ -966,13 +966,14 @@ fn visit_instance_use<'tcx>(
         ty::InstanceDef::ThreadLocalShim(..) => {
             bug!("{:?} being reified", instance);
         }
-        ty::InstanceDef::DropGlue(_, None) => {
+        ty::InstanceDef::DropGlue(_, None) | ty::InstanceDef::AsyncDropGlueCtorShim(_, None) => {
             // Don't need to emit noop drop glue if we are calling directly.
             if !is_direct_call {
                 output.push(create_fn_mono_item(tcx, instance, source));
             }
         }
         ty::InstanceDef::DropGlue(_, Some(_))
+        | ty::InstanceDef::AsyncDropGlueCtorShim(_, Some(_))
         | ty::InstanceDef::VTableShim(..)
         | ty::InstanceDef::ReifyShim(..)
         | ty::InstanceDef::ClosureOnceShim { .. }
diff --git a/compiler/rustc_monomorphize/src/partitioning.rs b/compiler/rustc_monomorphize/src/partitioning.rs
index 5a92657cb40d7..23e07890bb6c5 100644
--- a/compiler/rustc_monomorphize/src/partitioning.rs
+++ b/compiler/rustc_monomorphize/src/partitioning.rs
@@ -625,7 +625,8 @@ fn characteristic_def_id_of_mono_item<'tcx>(
                 | ty::InstanceDef::Virtual(..)
                 | ty::InstanceDef::CloneShim(..)
                 | ty::InstanceDef::ThreadLocalShim(..)
-                | ty::InstanceDef::FnPtrAddrShim(..) => return None,
+                | ty::InstanceDef::FnPtrAddrShim(..)
+                | ty::InstanceDef::AsyncDropGlueCtorShim(..) => return None,
             };
 
             // If this is a method, we want to put it into the same module as
@@ -769,7 +770,9 @@ fn mono_item_visibility<'tcx>(
     };
 
     let def_id = match instance.def {
-        InstanceDef::Item(def_id) | InstanceDef::DropGlue(def_id, Some(_)) => def_id,
+        InstanceDef::Item(def_id)
+        | InstanceDef::DropGlue(def_id, Some(_))
+        | InstanceDef::AsyncDropGlueCtorShim(def_id, Some(_)) => def_id,
 
         // We match the visibility of statics here
         InstanceDef::ThreadLocalShim(def_id) => {
@@ -786,6 +789,7 @@ fn mono_item_visibility<'tcx>(
         | InstanceDef::ConstructCoroutineInClosureShim { .. }
         | InstanceDef::CoroutineKindShim { .. }
         | InstanceDef::DropGlue(..)
+        | InstanceDef::AsyncDropGlueCtorShim(..)
         | InstanceDef::CloneShim(..)
         | InstanceDef::FnPtrAddrShim(..) => return Visibility::Hidden,
     };
diff --git a/compiler/rustc_smir/src/rustc_smir/context.rs b/compiler/rustc_smir/src/rustc_smir/context.rs
index 61bbedf9eec9d..fd31c020f899a 100644
--- a/compiler/rustc_smir/src/rustc_smir/context.rs
+++ b/compiler/rustc_smir/src/rustc_smir/context.rs
@@ -500,6 +500,12 @@ impl<'tcx> Context for TablesWrapper<'tcx> {
         matches!(instance.def, ty::InstanceDef::DropGlue(_, None))
     }
 
+    fn is_empty_async_drop_ctor_shim(&self, def: InstanceDef) -> bool {
+        let tables = self.0.borrow_mut();
+        let instance = tables.instances[def];
+        matches!(instance.def, ty::InstanceDef::AsyncDropGlueCtorShim(_, None))
+    }
+
     fn mono_instance(&self, def_id: stable_mir::DefId) -> stable_mir::mir::mono::Instance {
         let mut tables = self.0.borrow_mut();
         let def_id = tables[def_id];
diff --git a/compiler/rustc_smir/src/rustc_smir/convert/ty.rs b/compiler/rustc_smir/src/rustc_smir/convert/ty.rs
index 112e44f674efb..4abf991fba25d 100644
--- a/compiler/rustc_smir/src/rustc_smir/convert/ty.rs
+++ b/compiler/rustc_smir/src/rustc_smir/convert/ty.rs
@@ -807,7 +807,10 @@ impl<'tcx> Stable<'tcx> for ty::Instance<'tcx> {
             | ty::InstanceDef::ThreadLocalShim(..)
             | ty::InstanceDef::DropGlue(..)
             | ty::InstanceDef::CloneShim(..)
-            | ty::InstanceDef::FnPtrShim(..) => stable_mir::mir::mono::InstanceKind::Shim,
+            | ty::InstanceDef::FnPtrShim(..)
+            | ty::InstanceDef::AsyncDropGlueCtorShim(..) => {
+                stable_mir::mir::mono::InstanceKind::Shim
+            }
         };
         stable_mir::mir::mono::Instance { def, kind }
     }
diff --git a/compiler/rustc_span/src/symbol.rs b/compiler/rustc_span/src/symbol.rs
index 19c3fc5894308..fda40e314cee4 100644
--- a/compiler/rustc_span/src/symbol.rs
+++ b/compiler/rustc_span/src/symbol.rs
@@ -424,6 +424,16 @@ symbols! {
         async_call_mut,
         async_call_once,
         async_closure,
+        async_destruct,
+        async_drop,
+        async_drop_chain,
+        async_drop_defer,
+        async_drop_either,
+        async_drop_fuse,
+        async_drop_in_place,
+        async_drop_noop,
+        async_drop_slice,
+        async_drop_surface_drop_in_place,
         async_fn,
         async_fn_in_trait,
         async_fn_kind_helper,
@@ -825,6 +835,7 @@ symbols! {
         fadd_fast,
         fake_variadic,
         fallback,
+        fallback_surface_drop,
         fdiv_algebraic,
         fdiv_fast,
         feature,
@@ -1786,6 +1797,7 @@ symbols! {
         sub_assign,
         sub_with_overflow,
         suggestion,
+        surface_async_drop_in_place,
         sym,
         sync,
         synthetic,
diff --git a/compiler/rustc_symbol_mangling/src/legacy.rs b/compiler/rustc_symbol_mangling/src/legacy.rs
index f68668a91e688..f1c3512315ff1 100644
--- a/compiler/rustc_symbol_mangling/src/legacy.rs
+++ b/compiler/rustc_symbol_mangling/src/legacy.rs
@@ -55,7 +55,9 @@ pub(super) fn mangle<'tcx>(
     printer
         .print_def_path(
             def_id,
-            if let ty::InstanceDef::DropGlue(_, _) = instance.def {
+            if let ty::InstanceDef::DropGlue(_, _) | ty::InstanceDef::AsyncDropGlueCtorShim(_, _) =
+                instance.def
+            {
                 // Add the name of the dropped type to the symbol name
                 &*instance.args
             } else {
diff --git a/compiler/rustc_trait_selection/src/solve/assembly/mod.rs b/compiler/rustc_trait_selection/src/solve/assembly/mod.rs
index 8b5c029428cfc..68b0db2114132 100644
--- a/compiler/rustc_trait_selection/src/solve/assembly/mod.rs
+++ b/compiler/rustc_trait_selection/src/solve/assembly/mod.rs
@@ -240,6 +240,11 @@ pub(super) trait GoalKind<'tcx>:
         goal: Goal<'tcx, Self>,
     ) -> QueryResult<'tcx>;
 
+    fn consider_builtin_async_destruct_candidate(
+        ecx: &mut EvalCtxt<'_, 'tcx>,
+        goal: Goal<'tcx, Self>,
+    ) -> QueryResult<'tcx>;
+
     fn consider_builtin_destruct_candidate(
         ecx: &mut EvalCtxt<'_, 'tcx>,
         goal: Goal<'tcx, Self>,
@@ -520,6 +525,8 @@ impl<'tcx> EvalCtxt<'_, 'tcx> {
             G::consider_builtin_coroutine_candidate(self, goal)
         } else if lang_items.discriminant_kind_trait() == Some(trait_def_id) {
             G::consider_builtin_discriminant_kind_candidate(self, goal)
+        } else if lang_items.async_destruct_trait() == Some(trait_def_id) {
+            G::consider_builtin_async_destruct_candidate(self, goal)
         } else if lang_items.destruct_trait() == Some(trait_def_id) {
             G::consider_builtin_destruct_candidate(self, goal)
         } else if lang_items.transmute_trait() == Some(trait_def_id) {
diff --git a/compiler/rustc_trait_selection/src/solve/normalizes_to/mod.rs b/compiler/rustc_trait_selection/src/solve/normalizes_to/mod.rs
index ebf2a0d96213f..c662ab23c53b6 100644
--- a/compiler/rustc_trait_selection/src/solve/normalizes_to/mod.rs
+++ b/compiler/rustc_trait_selection/src/solve/normalizes_to/mod.rs
@@ -814,6 +814,59 @@ impl<'tcx> assembly::GoalKind<'tcx> for NormalizesTo<'tcx> {
         })
     }
 
+    fn consider_builtin_async_destruct_candidate(
+        ecx: &mut EvalCtxt<'_, 'tcx>,
+        goal: Goal<'tcx, Self>,
+    ) -> QueryResult<'tcx> {
+        let self_ty = goal.predicate.self_ty();
+        let async_destructor_ty = match *self_ty.kind() {
+            ty::Bool
+            | ty::Char
+            | ty::Int(..)
+            | ty::Uint(..)
+            | ty::Float(..)
+            | ty::Array(..)
+            | ty::RawPtr(..)
+            | ty::Ref(..)
+            | ty::FnDef(..)
+            | ty::FnPtr(..)
+            | ty::Closure(..)
+            | ty::CoroutineClosure(..)
+            | ty::Infer(ty::IntVar(..) | ty::FloatVar(..))
+            | ty::Never
+            | ty::Adt(_, _)
+            | ty::Str
+            | ty::Slice(_)
+            | ty::Tuple(_)
+            | ty::Error(_) => self_ty.async_destructor_ty(ecx.tcx(), goal.param_env),
+
+            // We do not call `Ty::async_destructor_ty` on alias, param, or placeholder
+            // types, which return `<self_ty as AsyncDestruct>::AsyncDestructor`
+            // (or ICE in the case of placeholders). Projecting a type to itself
+            // is never really productive.
+            ty::Alias(_, _) | ty::Param(_) | ty::Placeholder(..) => {
+                return Err(NoSolution);
+            }
+
+            ty::Infer(ty::TyVar(_) | ty::FreshTy(_) | ty::FreshIntTy(_) | ty::FreshFloatTy(_))
+            | ty::Foreign(..)
+            | ty::Bound(..) => bug!(
+                "unexpected self ty `{:?}` when normalizing `<T as AsyncDestruct>::AsyncDestructor`",
+                goal.predicate.self_ty()
+            ),
+
+            ty::Pat(..) | ty::Dynamic(..) | ty::Coroutine(..) | ty::CoroutineWitness(..) => bug!(
+                "`consider_builtin_async_destruct_candidate` is not yet implemented for type: {self_ty:?}"
+            ),
+        };
+
+        ecx.probe_misc_candidate("builtin async destruct").enter(|ecx| {
+            ecx.eq(goal.param_env, goal.predicate.term, async_destructor_ty.into())
+                .expect("expected goal term to be fully unconstrained");
+            ecx.evaluate_added_goals_and_make_canonical_response(Certainty::Yes)
+        })
+    }
+
     fn consider_builtin_destruct_candidate(
         _ecx: &mut EvalCtxt<'_, 'tcx>,
         goal: Goal<'tcx, Self>,
diff --git a/compiler/rustc_trait_selection/src/solve/trait_goals.rs b/compiler/rustc_trait_selection/src/solve/trait_goals.rs
index e522339358ade..ed76ea74f08f9 100644
--- a/compiler/rustc_trait_selection/src/solve/trait_goals.rs
+++ b/compiler/rustc_trait_selection/src/solve/trait_goals.rs
@@ -544,6 +544,18 @@ impl<'tcx> assembly::GoalKind<'tcx> for TraitPredicate<'tcx> {
         ecx.evaluate_added_goals_and_make_canonical_response(Certainty::Yes)
     }
 
+    fn consider_builtin_async_destruct_candidate(
+        ecx: &mut EvalCtxt<'_, 'tcx>,
+        goal: Goal<'tcx, Self>,
+    ) -> QueryResult<'tcx> {
+        if goal.predicate.polarity != ty::PredicatePolarity::Positive {
+            return Err(NoSolution);
+        }
+
+        // `AsyncDestruct` is automatically implemented for every type.
+        ecx.evaluate_added_goals_and_make_canonical_response(Certainty::Yes)
+    }
+
     fn consider_builtin_destruct_candidate(
         ecx: &mut EvalCtxt<'_, 'tcx>,
         goal: Goal<'tcx, Self>,
diff --git a/compiler/rustc_trait_selection/src/traits/project.rs b/compiler/rustc_trait_selection/src/traits/project.rs
index 4a8df6c6a5ba6..47df11ff7f805 100644
--- a/compiler/rustc_trait_selection/src/traits/project.rs
+++ b/compiler/rustc_trait_selection/src/traits/project.rs
@@ -1074,6 +1074,42 @@ fn assemble_candidates_from_impls<'cx, 'tcx>(
                         | ty::Infer(..)
                         | ty::Error(_) => false,
                     }
+                } else if lang_items.async_destruct_trait() == Some(trait_ref.def_id) {
+                    match self_ty.kind() {
+                        ty::Bool
+                        | ty::Char
+                        | ty::Int(_)
+                        | ty::Uint(_)
+                        | ty::Float(_)
+                        | ty::Adt(..)
+                        | ty::Str
+                        | ty::Array(..)
+                        | ty::Slice(_)
+                        | ty::RawPtr(..)
+                        | ty::Ref(..)
+                        | ty::FnDef(..)
+                        | ty::FnPtr(..)
+                        | ty::Dynamic(..)
+                        | ty::Closure(..)
+                        | ty::CoroutineClosure(..)
+                        | ty::Coroutine(..)
+                        | ty::CoroutineWitness(..)
+                        | ty::Pat(..)
+                        | ty::Never
+                        | ty::Tuple(..)
+                        | ty::Infer(ty::InferTy::IntVar(_) | ty::InferTy::FloatVar(..)) => true,
+
+                        // type parameters, opaques, and unnormalized projections don't have
+                        // a known async destructor and may need to be normalized further or rely
+                        // on param env for async destructor projections
+                        ty::Param(_)
+                        | ty::Foreign(_)
+                        | ty::Alias(..)
+                        | ty::Bound(..)
+                        | ty::Placeholder(..)
+                        | ty::Infer(_)
+                        | ty::Error(_) => false,
+                    }
                 } else if lang_items.pointee_trait() == Some(trait_ref.def_id) {
                     let tail = selcx.tcx().struct_tail_with_normalize(
                         self_ty,
@@ -1488,15 +1524,20 @@ fn confirm_builtin_candidate<'cx, 'tcx>(
 ) -> Progress<'tcx> {
     let tcx = selcx.tcx();
     let self_ty = obligation.predicate.self_ty();
-    let args = tcx.mk_args(&[self_ty.into()]);
     let lang_items = tcx.lang_items();
     let item_def_id = obligation.predicate.def_id;
     let trait_def_id = tcx.trait_of_item(item_def_id).unwrap();
+    let args = tcx.mk_args(&[self_ty.into()]);
     let (term, obligations) = if lang_items.discriminant_kind_trait() == Some(trait_def_id) {
         let discriminant_def_id = tcx.require_lang_item(LangItem::Discriminant, None);
         assert_eq!(discriminant_def_id, item_def_id);
 
         (self_ty.discriminant_ty(tcx).into(), Vec::new())
+    } else if lang_items.async_destruct_trait() == Some(trait_def_id) {
+        let destructor_def_id = tcx.associated_item_def_ids(trait_def_id)[0];
+        assert_eq!(destructor_def_id, item_def_id);
+
+        (self_ty.async_destructor_ty(tcx, obligation.param_env).into(), Vec::new())
     } else if lang_items.pointee_trait() == Some(trait_def_id) {
         let metadata_def_id = tcx.require_lang_item(LangItem::Metadata, None);
         assert_eq!(metadata_def_id, item_def_id);
diff --git a/compiler/rustc_trait_selection/src/traits/select/candidate_assembly.rs b/compiler/rustc_trait_selection/src/traits/select/candidate_assembly.rs
index 3ef7cc01f9021..61fe66be6f522 100644
--- a/compiler/rustc_trait_selection/src/traits/select/candidate_assembly.rs
+++ b/compiler/rustc_trait_selection/src/traits/select/candidate_assembly.rs
@@ -81,6 +81,9 @@ impl<'cx, 'tcx> SelectionContext<'cx, 'tcx> {
             } else if lang_items.discriminant_kind_trait() == Some(def_id) {
                 // `DiscriminantKind` is automatically implemented for every type.
                 candidates.vec.push(BuiltinCandidate { has_nested: false });
+            } else if lang_items.async_destruct_trait() == Some(def_id) {
+                // `AsyncDestruct` is automatically implemented for every type.
+                candidates.vec.push(BuiltinCandidate { has_nested: false });
             } else if lang_items.pointee_trait() == Some(def_id) {
                 // `Pointee` is automatically implemented for every type.
                 candidates.vec.push(BuiltinCandidate { has_nested: false });
diff --git a/compiler/rustc_ty_utils/src/common_traits.rs b/compiler/rustc_ty_utils/src/common_traits.rs
index 51b908881eb49..cb95239e99192 100644
--- a/compiler/rustc_ty_utils/src/common_traits.rs
+++ b/compiler/rustc_ty_utils/src/common_traits.rs
@@ -22,6 +22,17 @@ fn is_unpin_raw<'tcx>(tcx: TyCtxt<'tcx>, query: ty::ParamEnvAnd<'tcx, Ty<'tcx>>)
     is_item_raw(tcx, query, LangItem::Unpin)
 }
 
+fn has_surface_async_drop_raw<'tcx>(
+    tcx: TyCtxt<'tcx>,
+    query: ty::ParamEnvAnd<'tcx, Ty<'tcx>>,
+) -> bool {
+    is_item_raw(tcx, query, LangItem::AsyncDrop)
+}
+
+fn has_surface_drop_raw<'tcx>(tcx: TyCtxt<'tcx>, query: ty::ParamEnvAnd<'tcx, Ty<'tcx>>) -> bool {
+    is_item_raw(tcx, query, LangItem::Drop)
+}
+
 fn is_item_raw<'tcx>(
     tcx: TyCtxt<'tcx>,
     query: ty::ParamEnvAnd<'tcx, Ty<'tcx>>,
@@ -34,5 +45,13 @@ fn is_item_raw<'tcx>(
 }
 
 pub(crate) fn provide(providers: &mut Providers) {
-    *providers = Providers { is_copy_raw, is_sized_raw, is_freeze_raw, is_unpin_raw, ..*providers };
+    *providers = Providers {
+        is_copy_raw,
+        is_sized_raw,
+        is_freeze_raw,
+        is_unpin_raw,
+        has_surface_async_drop_raw,
+        has_surface_drop_raw,
+        ..*providers
+    };
 }
diff --git a/compiler/rustc_ty_utils/src/instance.rs b/compiler/rustc_ty_utils/src/instance.rs
index a8f9afb87dd7d..c1661fa63a8b8 100644
--- a/compiler/rustc_ty_utils/src/instance.rs
+++ b/compiler/rustc_ty_utils/src/instance.rs
@@ -54,6 +54,28 @@ fn resolve_instance<'tcx>(
                 debug!(" => trivial drop glue");
                 ty::InstanceDef::DropGlue(def_id, None)
             }
+        } else if Some(def_id) == tcx.lang_items().async_drop_in_place_fn() {
+            let ty = args.type_at(0);
+
+            if !ty.is_async_destructor_noop(tcx, param_env) {
+                match *ty.kind() {
+                    ty::Closure(..)
+                    | ty::CoroutineClosure(..)
+                    | ty::Coroutine(..)
+                    | ty::Tuple(..)
+                    | ty::Adt(..)
+                    | ty::Dynamic(..)
+                    | ty::Array(..)
+                    | ty::Slice(..) => {}
+                    // Async destructor ctor shims can only be built from ADTs.
+                    _ => return Ok(None),
+                }
+                debug!(" => nontrivial async drop glue ctor");
+                ty::InstanceDef::AsyncDropGlueCtorShim(def_id, Some(ty))
+            } else {
+                debug!(" => trivial async drop glue ctor");
+                ty::InstanceDef::AsyncDropGlueCtorShim(def_id, None)
+            }
         } else {
             debug!(" => free item");
             // FIXME(effects): we may want to erase the effect param if that is present on this item.
diff --git a/compiler/stable_mir/src/compiler_interface.rs b/compiler/stable_mir/src/compiler_interface.rs
index 94c552199bc5f..e82d1f4813ea0 100644
--- a/compiler/stable_mir/src/compiler_interface.rs
+++ b/compiler/stable_mir/src/compiler_interface.rs
@@ -158,6 +158,9 @@ pub trait Context {
     /// Check if this is an empty DropGlue shim.
     fn is_empty_drop_shim(&self, def: InstanceDef) -> bool;
 
+    /// Check if this is an empty AsyncDropGlueCtor shim.
+    fn is_empty_async_drop_ctor_shim(&self, def: InstanceDef) -> bool;
+
     /// Convert a non-generic crate item into an instance.
     /// This function will panic if the item is generic.
     fn mono_instance(&self, def_id: DefId) -> Instance;
diff --git a/compiler/stable_mir/src/mir/mono.rs b/compiler/stable_mir/src/mir/mono.rs
index a032a180fcfdd..394038926f61a 100644
--- a/compiler/stable_mir/src/mir/mono.rs
+++ b/compiler/stable_mir/src/mir/mono.rs
@@ -157,7 +157,10 @@ impl Instance {
     /// When generating code for a Drop terminator, users can ignore an empty drop glue.
     /// These shims are only needed to generate a valid Drop call done via VTable.
     pub fn is_empty_shim(&self) -> bool {
-        self.kind == InstanceKind::Shim && with(|cx| cx.is_empty_drop_shim(self.def))
+        self.kind == InstanceKind::Shim
+            && with(|cx| {
+                cx.is_empty_drop_shim(self.def) || cx.is_empty_async_drop_ctor_shim(self.def)
+            })
     }
 
     /// Try to constant evaluate the instance into a constant with the given type.
diff --git a/library/core/src/future/async_drop.rs b/library/core/src/future/async_drop.rs
new file mode 100644
index 0000000000000..0eb8d7bb32899
--- /dev/null
+++ b/library/core/src/future/async_drop.rs
@@ -0,0 +1,271 @@
+#![unstable(feature = "async_drop", issue = "none")]
+
+use crate::fmt;
+use crate::future::{Future, IntoFuture};
+use crate::intrinsics::discriminant_value;
+use crate::marker::{DiscriminantKind, PhantomPinned};
+use crate::mem::MaybeUninit;
+use crate::pin::Pin;
+use crate::task::{ready, Context, Poll};
+
+/// Asynchronously drops a value by running `AsyncDrop::async_drop`
+/// on a value and its fields recursively.
+#[unstable(feature = "async_drop", issue = "none")]
+pub fn async_drop<T>(value: T) -> AsyncDropOwning<T> {
+    AsyncDropOwning { value: MaybeUninit::new(value), dtor: None, _pinned: PhantomPinned }
+}
+
+/// A future returned by the [`async_drop`].
+#[unstable(feature = "async_drop", issue = "none")]
+pub struct AsyncDropOwning<T> {
+    value: MaybeUninit<T>,
+    dtor: Option<AsyncDropInPlace<T>>,
+    _pinned: PhantomPinned,
+}
+
+#[unstable(feature = "async_drop", issue = "none")]
+impl<T> fmt::Debug for AsyncDropOwning<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("AsyncDropOwning").finish_non_exhaustive()
+    }
+}
+
+#[unstable(feature = "async_drop", issue = "none")]
+impl<T> Future for AsyncDropOwning<T> {
+    type Output = ();
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        // SAFETY: Self is pinned thus it is ok to store references to self
+        unsafe {
+            let this = self.get_unchecked_mut();
+            let dtor = Pin::new_unchecked(
+                this.dtor.get_or_insert_with(|| async_drop_in_place(this.value.as_mut_ptr())),
+            );
+            // AsyncDestuctors are idempotent so Self gets idempotency as well
+            dtor.poll(cx)
+        }
+    }
+}
+
+#[lang = "async_drop_in_place"]
+#[allow(unconditional_recursion)]
+// FIXME: Consider if `#[rustc_diagnostic_item = "ptr_drop_in_place"]` is needed?
+unsafe fn async_drop_in_place_raw<T: ?Sized>(
+    to_drop: *mut T,
+) -> <T as AsyncDestruct>::AsyncDestructor {
+    // Code here does not matter - this is replaced by the
+    // real async drop glue constructor by the compiler.
+
+    // SAFETY: see comment above
+    unsafe { async_drop_in_place_raw(to_drop) }
+}
+
+/// Creates the asynchronous destructor of the pointed-to value.
+///
+/// # Safety
+///
+/// Behavior is undefined if any of the following conditions are violated:
+///
+/// * `to_drop` must be [valid](crate::ptr#safety) for both reads and writes.
+///
+/// * `to_drop` must be properly aligned, even if `T` has size 0.
+///
+/// * `to_drop` must be nonnull, even if `T` has size 0.
+///
+/// * The value `to_drop` points to must be valid for async dropping,
+///   which may mean it must uphold additional invariants. These
+///   invariants depend on the type of the value being dropped. For
+///   instance, when dropping a Box, the box's pointer to the heap must
+///   be valid.
+///
+/// * While `async_drop_in_place` is executing or the returned async
+///   destructor is alive, the only way to access parts of `to_drop`
+///   is through the `self: Pin<&mut Self>` references supplied to
+///   the `AsyncDrop::async_drop` methods that `async_drop_in_place`
+///   or `AsyncDropInPlace<T>::poll` invokes. This usually means the
+///   returned future stores the `to_drop` pointer and user is required
+///   to guarantee that dropped value doesn't move.
+///
+#[unstable(feature = "async_drop", issue = "none")]
+pub unsafe fn async_drop_in_place<T: ?Sized>(to_drop: *mut T) -> AsyncDropInPlace<T> {
+    // SAFETY: `async_drop_in_place_raw` has the same safety requirements
+    unsafe { AsyncDropInPlace(async_drop_in_place_raw(to_drop)) }
+}
+
+/// A future returned by the [`async_drop_in_place`].
+#[unstable(feature = "async_drop", issue = "none")]
+pub struct AsyncDropInPlace<T: ?Sized>(<T as AsyncDestruct>::AsyncDestructor);
+
+#[unstable(feature = "async_drop", issue = "none")]
+impl<T: ?Sized> fmt::Debug for AsyncDropInPlace<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("AsyncDropInPlace").finish_non_exhaustive()
+    }
+}
+
+#[unstable(feature = "async_drop", issue = "none")]
+impl<T: ?Sized> Future for AsyncDropInPlace<T> {
+    type Output = ();
+
+    #[inline(always)]
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        // SAFETY: This code simply forwards poll call to the inner future
+        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().0) }.poll(cx)
+    }
+}
+
+// FIXME(zetanumbers): Add same restrictions on AsyncDrop impls as
+//   with Drop impls
+/// Custom code within the asynchronous destructor.
+#[unstable(feature = "async_drop", issue = "none")]
+#[lang = "async_drop"]
+pub trait AsyncDrop {
+    /// A future returned by the [`AsyncDrop::async_drop`] to be part
+    /// of the async destructor.
+    #[unstable(feature = "async_drop", issue = "none")]
+    type Dropper<'a>: Future<Output = ()>
+    where
+        Self: 'a;
+
+    /// Constructs the asynchronous destructor for this type.
+    #[unstable(feature = "async_drop", issue = "none")]
+    fn async_drop(self: Pin<&mut Self>) -> Self::Dropper<'_>;
+}
+
+#[lang = "async_destruct"]
+#[rustc_deny_explicit_impl(implement_via_object = false)]
+trait AsyncDestruct {
+    type AsyncDestructor: Future<Output = ()>;
+}
+
+/// Basically calls `AsyncDrop::async_drop` with pointer. Used to simplify
+/// generation of the code for `async_drop_in_place_raw`
+#[lang = "surface_async_drop_in_place"]
+async unsafe fn surface_async_drop_in_place<T: AsyncDrop + ?Sized>(ptr: *mut T) {
+    // SAFETY: We call this from async drop `async_drop_in_place_raw`
+    //   which has the same safety requirements
+    unsafe { <T as AsyncDrop>::async_drop(Pin::new_unchecked(&mut *ptr)).await }
+}
+
+/// Basically calls `Drop::drop` with pointer. Used to simplify generation
+/// of the code for `async_drop_in_place_raw`
+#[allow(drop_bounds)]
+#[lang = "async_drop_surface_drop_in_place"]
+async unsafe fn surface_drop_in_place<T: Drop + ?Sized>(ptr: *mut T) {
+    // SAFETY: We call this from async drop `async_drop_in_place_raw`
+    //   which has the same safety requirements
+    unsafe { crate::ops::fallback_surface_drop(&mut *ptr) }
+}
+
+/// Wraps a future to continue outputing `Poll::Ready(())` once after
+/// wrapped future completes by returning `Poll::Ready(())` on poll. This
+/// is useful for constructing async destructors to guarantee this
+/// "fuse" property
+struct Fuse<T> {
+    inner: Option<T>,
+}
+
+#[lang = "async_drop_fuse"]
+fn fuse<T>(inner: T) -> Fuse<T> {
+    Fuse { inner: Some(inner) }
+}
+
+impl<T> Future for Fuse<T>
+where
+    T: Future<Output = ()>,
+{
+    type Output = ();
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        // SAFETY: pin projection into `self.inner`
+        unsafe {
+            let this = self.get_unchecked_mut();
+            if let Some(inner) = &mut this.inner {
+                ready!(Pin::new_unchecked(inner).poll(cx));
+                this.inner = None;
+            }
+        }
+        Poll::Ready(())
+    }
+}
+
+/// Async destructor for arrays and slices.
+#[lang = "async_drop_slice"]
+async unsafe fn slice<T>(s: *mut [T]) {
+    let len = s.len();
+    let ptr = s.as_mut_ptr();
+    for i in 0..len {
+        // SAFETY: we iterate over elements of `s` slice
+        unsafe { async_drop_in_place_raw(ptr.add(i)).await }
+    }
+}
+
+/// Construct a chain of two futures, which awaits them sequentially as
+/// a future.
+#[lang = "async_drop_chain"]
+async fn chain<F, G>(first: F, last: G)
+where
+    F: IntoFuture<Output = ()>,
+    G: IntoFuture<Output = ()>,
+{
+    first.await;
+    last.await;
+}
+
+/// Basically a lazy version of `async_drop_in_place`. Returns a future
+/// that would call `AsyncDrop::async_drop` on a first poll.
+///
+/// # Safety
+///
+/// Same as `async_drop_in_place` except is lazy to avoid creating
+/// multiple mutable refernces.
+#[lang = "async_drop_defer"]
+async unsafe fn defer<T: ?Sized>(to_drop: *mut T) {
+    // SAFETY: same safety requirements as `async_drop_in_place`
+    unsafe { async_drop_in_place(to_drop) }.await
+}
+
+/// If `T`'s discriminant is equal to the stored one then awaits `M`
+/// otherwise awaits the `O`.
+///
+/// # Safety
+///
+/// User should carefully manage returned future, since it would
+/// try creating an immutable referece from `this` and get pointee's
+/// discriminant.
+// FIXME(zetanumbers): Send and Sync impls
+#[lang = "async_drop_either"]
+async unsafe fn either<O: IntoFuture<Output = ()>, M: IntoFuture<Output = ()>, T>(
+    other: O,
+    matched: M,
+    this: *mut T,
+    discr: <T as DiscriminantKind>::Discriminant,
+) {
+    // SAFETY: Guaranteed by the safety section of this funtion's documentation
+    if unsafe { discriminant_value(&*this) } == discr {
+        drop(other);
+        matched.await
+    } else {
+        drop(matched);
+        other.await
+    }
+}
+
+/// Used for noop async destructors. We don't use [`core::future::Ready`]
+/// because it panics after its second poll, which could be potentially
+/// bad if that would happen during the cleanup.
+#[derive(Clone, Copy)]
+struct Noop;
+
+#[lang = "async_drop_noop"]
+fn noop() -> Noop {
+    Noop
+}
+
+impl Future for Noop {
+    type Output = ();
+
+    fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
+        Poll::Ready(())
+    }
+}
diff --git a/library/core/src/future/mod.rs b/library/core/src/future/mod.rs
index 0f77a2d83433f..c3bd18e30aa49 100644
--- a/library/core/src/future/mod.rs
+++ b/library/core/src/future/mod.rs
@@ -12,6 +12,8 @@
 use crate::ptr::NonNull;
 use crate::task::Context;
 
+#[cfg(not(bootstrap))]
+mod async_drop;
 mod future;
 mod into_future;
 mod join;
@@ -36,6 +38,10 @@ pub use ready::{ready, Ready};
 #[stable(feature = "future_poll_fn", since = "1.64.0")]
 pub use poll_fn::{poll_fn, PollFn};
 
+#[cfg(not(bootstrap))]
+#[unstable(feature = "async_drop", issue = "none")]
+pub use async_drop::{async_drop, async_drop_in_place, AsyncDrop, AsyncDropInPlace};
+
 /// This type is needed because:
 ///
 /// a) Coroutines cannot implement `for<'a, 'b> Coroutine<&'a mut Context<'b>>`, so we need to pass
diff --git a/library/core/src/ops/drop.rs b/library/core/src/ops/drop.rs
index 34dfa9e4c51fc..1325d90e4f345 100644
--- a/library/core/src/ops/drop.rs
+++ b/library/core/src/ops/drop.rs
@@ -238,3 +238,11 @@ pub trait Drop {
     #[stable(feature = "rust1", since = "1.0.0")]
     fn drop(&mut self);
 }
+
+/// Fallback function to call surface level `Drop::drop` function
+#[cfg(not(bootstrap))]
+#[allow(drop_bounds)]
+#[lang = "fallback_surface_drop"]
+pub(crate) fn fallback_surface_drop<T: Drop + ?Sized>(x: &mut T) {
+    <T as Drop>::drop(x)
+}
diff --git a/library/core/src/ops/mod.rs b/library/core/src/ops/mod.rs
index ac808bec50ece..81d5e5c949ec5 100644
--- a/library/core/src/ops/mod.rs
+++ b/library/core/src/ops/mod.rs
@@ -174,6 +174,9 @@ pub use self::deref::Receiver;
 #[stable(feature = "rust1", since = "1.0.0")]
 pub use self::drop::Drop;
 
+#[cfg(not(bootstrap))]
+pub(crate) use self::drop::fallback_surface_drop;
+
 #[stable(feature = "rust1", since = "1.0.0")]
 pub use self::function::{Fn, FnMut, FnOnce};
 
diff --git a/src/tools/miri/tests/pass/async-drop.rs b/src/tools/miri/tests/pass/async-drop.rs
new file mode 100644
index 0000000000000..f16206f3db62d
--- /dev/null
+++ b/src/tools/miri/tests/pass/async-drop.rs
@@ -0,0 +1,191 @@
+//@revisions: stack tree
+//@compile-flags: -Zmiri-strict-provenance
+//@[tree]compile-flags: -Zmiri-tree-borrows
+#![feature(async_drop, impl_trait_in_assoc_type, noop_waker, async_closure)]
+#![allow(incomplete_features, dead_code)]
+
+// FIXME(zetanumbers): consider AsyncDestruct::async_drop cleanup tests
+use core::future::{async_drop_in_place, AsyncDrop, Future};
+use core::hint::black_box;
+use core::mem::{self, ManuallyDrop};
+use core::pin::{pin, Pin};
+use core::task::{Context, Poll, Waker};
+
+async fn test_async_drop<T>(x: T) {
+    let mut x = mem::MaybeUninit::new(x);
+    let dtor = pin!(unsafe { async_drop_in_place(x.as_mut_ptr()) });
+    test_idempotency(dtor).await;
+}
+
+fn test_idempotency<T>(mut x: Pin<&mut T>) -> impl Future<Output = ()> + '_
+where
+    T: Future<Output = ()>,
+{
+    core::future::poll_fn(move |cx| {
+        assert_eq!(x.as_mut().poll(cx), Poll::Ready(()));
+        assert_eq!(x.as_mut().poll(cx), Poll::Ready(()));
+        Poll::Ready(())
+    })
+}
+
+fn main() {
+    let waker = Waker::noop();
+    let mut cx = Context::from_waker(&waker);
+
+    let i = 13;
+    let fut = pin!(async {
+        test_async_drop(Int(0)).await;
+        test_async_drop(AsyncInt(0)).await;
+        test_async_drop([AsyncInt(1), AsyncInt(2)]).await;
+        test_async_drop((AsyncInt(3), AsyncInt(4))).await;
+        test_async_drop(5).await;
+        let j = 42;
+        test_async_drop(&i).await;
+        test_async_drop(&j).await;
+        test_async_drop(AsyncStruct { b: AsyncInt(8), a: AsyncInt(7), i: 6 }).await;
+        test_async_drop(ManuallyDrop::new(AsyncInt(9))).await;
+
+        let foo = AsyncInt(10);
+        test_async_drop(AsyncReference { foo: &foo }).await;
+
+        let foo = AsyncInt(11);
+        test_async_drop(|| {
+            black_box(foo);
+            let foo = AsyncInt(10);
+            foo
+        })
+        .await;
+
+        test_async_drop(AsyncEnum::A(AsyncInt(12))).await;
+        test_async_drop(AsyncEnum::B(SyncInt(13))).await;
+
+        test_async_drop(SyncInt(14)).await;
+        test_async_drop(SyncThenAsync { i: 15, a: AsyncInt(16), b: SyncInt(17), c: AsyncInt(18) })
+            .await;
+
+        let async_drop_fut = pin!(core::future::async_drop(AsyncInt(19)));
+        test_idempotency(async_drop_fut).await;
+
+        let foo = AsyncInt(20);
+        test_async_drop(async || {
+            black_box(foo);
+            let foo = AsyncInt(19);
+            // Await point there, but this is async closure so it's fine
+            black_box(core::future::ready(())).await;
+            foo
+        })
+        .await;
+
+        test_async_drop(AsyncUnion { signed: 21 }).await;
+    });
+    let res = fut.poll(&mut cx);
+    assert_eq!(res, Poll::Ready(()));
+}
+
+struct AsyncInt(i32);
+
+impl AsyncDrop for AsyncInt {
+    type Dropper<'a> = impl Future<Output = ()>;
+
+    fn async_drop(self: Pin<&mut Self>) -> Self::Dropper<'_> {
+        async move {
+            println!("AsyncInt::Dropper::poll: {}", self.0);
+        }
+    }
+}
+
+struct SyncInt(i32);
+
+impl Drop for SyncInt {
+    fn drop(&mut self) {
+        println!("SyncInt::drop: {}", self.0);
+    }
+}
+
+struct SyncThenAsync {
+    i: i32,
+    a: AsyncInt,
+    b: SyncInt,
+    c: AsyncInt,
+}
+
+impl Drop for SyncThenAsync {
+    fn drop(&mut self) {
+        println!("SyncThenAsync::drop: {}", self.i);
+    }
+}
+
+struct AsyncReference<'a> {
+    foo: &'a AsyncInt,
+}
+
+impl AsyncDrop for AsyncReference<'_> {
+    type Dropper<'a> = impl Future<Output = ()> where Self: 'a;
+
+    fn async_drop(self: Pin<&mut Self>) -> Self::Dropper<'_> {
+        async move {
+            println!("AsyncReference::Dropper::poll: {}", self.foo.0);
+        }
+    }
+}
+
+struct Int(i32);
+
+struct AsyncStruct {
+    i: i32,
+    a: AsyncInt,
+    b: AsyncInt,
+}
+
+impl AsyncDrop for AsyncStruct {
+    type Dropper<'a> = impl Future<Output = ()>;
+
+    fn async_drop(self: Pin<&mut Self>) -> Self::Dropper<'_> {
+        async move {
+            println!("AsyncStruct::Dropper::poll: {}", self.i);
+        }
+    }
+}
+
+enum AsyncEnum {
+    A(AsyncInt),
+    B(SyncInt),
+}
+
+impl AsyncDrop for AsyncEnum {
+    type Dropper<'a> = impl Future<Output = ()>;
+
+    fn async_drop(mut self: Pin<&mut Self>) -> Self::Dropper<'_> {
+        async move {
+            let new_self = match &*self {
+                AsyncEnum::A(foo) => {
+                    println!("AsyncEnum(A)::Dropper::poll: {}", foo.0);
+                    AsyncEnum::B(SyncInt(foo.0))
+                }
+                AsyncEnum::B(foo) => {
+                    println!("AsyncEnum(B)::Dropper::poll: {}", foo.0);
+                    AsyncEnum::A(AsyncInt(foo.0))
+                }
+            };
+            mem::forget(mem::replace(&mut *self, new_self));
+        }
+    }
+}
+
+// FIXME(zetanumbers): Disallow types with `AsyncDrop` in unions
+union AsyncUnion {
+    signed: i32,
+    unsigned: u32,
+}
+
+impl AsyncDrop for AsyncUnion {
+    type Dropper<'a> = impl Future<Output = ()>;
+
+    fn async_drop(self: Pin<&mut Self>) -> Self::Dropper<'_> {
+        async move {
+            println!("AsyncUnion::Dropper::poll: {}, {}", unsafe { self.signed }, unsafe {
+                self.unsigned
+            });
+        }
+    }
+}
diff --git a/src/tools/miri/tests/pass/async-drop.stack.stdout b/src/tools/miri/tests/pass/async-drop.stack.stdout
new file mode 100644
index 0000000000000..9cae4331caf92
--- /dev/null
+++ b/src/tools/miri/tests/pass/async-drop.stack.stdout
@@ -0,0 +1,22 @@
+AsyncInt::Dropper::poll: 0
+AsyncInt::Dropper::poll: 1
+AsyncInt::Dropper::poll: 2
+AsyncInt::Dropper::poll: 3
+AsyncInt::Dropper::poll: 4
+AsyncStruct::Dropper::poll: 6
+AsyncInt::Dropper::poll: 7
+AsyncInt::Dropper::poll: 8
+AsyncReference::Dropper::poll: 10
+AsyncInt::Dropper::poll: 11
+AsyncEnum(A)::Dropper::poll: 12
+SyncInt::drop: 12
+AsyncEnum(B)::Dropper::poll: 13
+AsyncInt::Dropper::poll: 13
+SyncInt::drop: 14
+SyncThenAsync::drop: 15
+AsyncInt::Dropper::poll: 16
+SyncInt::drop: 17
+AsyncInt::Dropper::poll: 18
+AsyncInt::Dropper::poll: 19
+AsyncInt::Dropper::poll: 20
+AsyncUnion::Dropper::poll: 21, 21
diff --git a/src/tools/miri/tests/pass/async-drop.tree.stdout b/src/tools/miri/tests/pass/async-drop.tree.stdout
new file mode 100644
index 0000000000000..9cae4331caf92
--- /dev/null
+++ b/src/tools/miri/tests/pass/async-drop.tree.stdout
@@ -0,0 +1,22 @@
+AsyncInt::Dropper::poll: 0
+AsyncInt::Dropper::poll: 1
+AsyncInt::Dropper::poll: 2
+AsyncInt::Dropper::poll: 3
+AsyncInt::Dropper::poll: 4
+AsyncStruct::Dropper::poll: 6
+AsyncInt::Dropper::poll: 7
+AsyncInt::Dropper::poll: 8
+AsyncReference::Dropper::poll: 10
+AsyncInt::Dropper::poll: 11
+AsyncEnum(A)::Dropper::poll: 12
+SyncInt::drop: 12
+AsyncEnum(B)::Dropper::poll: 13
+AsyncInt::Dropper::poll: 13
+SyncInt::drop: 14
+SyncThenAsync::drop: 15
+AsyncInt::Dropper::poll: 16
+SyncInt::drop: 17
+AsyncInt::Dropper::poll: 18
+AsyncInt::Dropper::poll: 19
+AsyncInt::Dropper::poll: 20
+AsyncUnion::Dropper::poll: 21, 21
diff --git a/tests/ui/async-await/async-drop.rs b/tests/ui/async-await/async-drop.rs
new file mode 100644
index 0000000000000..6d02dcebc0b65
--- /dev/null
+++ b/tests/ui/async-await/async-drop.rs
@@ -0,0 +1,197 @@
+//@ run-pass
+//@ check-run-results
+
+#![feature(async_drop, impl_trait_in_assoc_type, noop_waker, async_closure)]
+#![allow(incomplete_features, dead_code)]
+
+//@ edition: 2021
+
+// FIXME(zetanumbers): consider AsyncDestruct::async_drop cleanup tests
+use core::future::{async_drop_in_place, AsyncDrop, Future};
+use core::hint::black_box;
+use core::mem::{self, ManuallyDrop};
+use core::pin::{pin, Pin};
+use core::task::{Context, Poll, Waker};
+
+async fn test_async_drop<T>(x: T) {
+    let mut x = mem::MaybeUninit::new(x);
+    let dtor = pin!(unsafe { async_drop_in_place(x.as_mut_ptr()) });
+    test_idempotency(dtor).await;
+}
+
+fn test_idempotency<T>(mut x: Pin<&mut T>) -> impl Future<Output = ()> + '_
+where
+    T: Future<Output = ()>,
+{
+    core::future::poll_fn(move |cx| {
+        assert_eq!(x.as_mut().poll(cx), Poll::Ready(()));
+        assert_eq!(x.as_mut().poll(cx), Poll::Ready(()));
+        Poll::Ready(())
+    })
+}
+
+fn main() {
+    let waker = Waker::noop();
+    let mut cx = Context::from_waker(&waker);
+
+    let i = 13;
+    let fut = pin!(async {
+        test_async_drop(Int(0)).await;
+        test_async_drop(AsyncInt(0)).await;
+        test_async_drop([AsyncInt(1), AsyncInt(2)]).await;
+        test_async_drop((AsyncInt(3), AsyncInt(4))).await;
+        test_async_drop(5).await;
+        let j = 42;
+        test_async_drop(&i).await;
+        test_async_drop(&j).await;
+        test_async_drop(AsyncStruct { b: AsyncInt(8), a: AsyncInt(7), i: 6 }).await;
+        test_async_drop(ManuallyDrop::new(AsyncInt(9))).await;
+
+        let foo = AsyncInt(10);
+        test_async_drop(AsyncReference { foo: &foo }).await;
+
+        let foo = AsyncInt(11);
+        test_async_drop(|| {
+            black_box(foo);
+            let foo = AsyncInt(10);
+            foo
+        }).await;
+
+        test_async_drop(AsyncEnum::A(AsyncInt(12))).await;
+        test_async_drop(AsyncEnum::B(SyncInt(13))).await;
+
+        test_async_drop(SyncInt(14)).await;
+        test_async_drop(SyncThenAsync {
+            i: 15,
+            a: AsyncInt(16),
+            b: SyncInt(17),
+            c: AsyncInt(18),
+        }).await;
+
+        let async_drop_fut = pin!(core::future::async_drop(AsyncInt(19)));
+        test_idempotency(async_drop_fut).await;
+
+        let foo = AsyncInt(20);
+        test_async_drop(async || {
+            black_box(foo);
+            let foo = AsyncInt(19);
+            // Await point there, but this is async closure so it's fine
+            black_box(core::future::ready(())).await;
+            foo
+        }).await;
+
+        test_async_drop(AsyncUnion { signed: 21 }).await;
+    });
+    let res = fut.poll(&mut cx);
+    assert_eq!(res, Poll::Ready(()));
+}
+
+struct AsyncInt(i32);
+
+impl AsyncDrop for AsyncInt {
+    type Dropper<'a> = impl Future<Output = ()>;
+
+    fn async_drop(self: Pin<&mut Self>) -> Self::Dropper<'_> {
+        async move {
+            println!("AsyncInt::Dropper::poll: {}", self.0);
+        }
+    }
+}
+
+struct SyncInt(i32);
+
+impl Drop for SyncInt {
+    fn drop(&mut self) {
+        println!("SyncInt::drop: {}", self.0);
+    }
+}
+
+struct SyncThenAsync {
+    i: i32,
+    a: AsyncInt,
+    b: SyncInt,
+    c: AsyncInt,
+}
+
+impl Drop for SyncThenAsync {
+    fn drop(&mut self) {
+        println!("SyncThenAsync::drop: {}", self.i);
+    }
+}
+
+struct AsyncReference<'a> {
+    foo: &'a AsyncInt,
+}
+
+impl AsyncDrop for AsyncReference<'_> {
+    type Dropper<'a> = impl Future<Output = ()> where Self: 'a;
+
+    fn async_drop(self: Pin<&mut Self>) -> Self::Dropper<'_> {
+        async move {
+            println!("AsyncReference::Dropper::poll: {}", self.foo.0);
+        }
+    }
+}
+
+struct Int(i32);
+
+struct AsyncStruct {
+    i: i32,
+    a: AsyncInt,
+    b: AsyncInt,
+}
+
+impl AsyncDrop for AsyncStruct {
+    type Dropper<'a> = impl Future<Output = ()>;
+
+    fn async_drop(self: Pin<&mut Self>) -> Self::Dropper<'_> {
+        async move {
+            println!("AsyncStruct::Dropper::poll: {}", self.i);
+        }
+    }
+}
+
+enum AsyncEnum {
+    A(AsyncInt),
+    B(SyncInt),
+}
+
+impl AsyncDrop for AsyncEnum {
+    type Dropper<'a> = impl Future<Output = ()>;
+
+    fn async_drop(mut self: Pin<&mut Self>) -> Self::Dropper<'_> {
+        async move {
+            let new_self = match &*self {
+                AsyncEnum::A(foo) => {
+                    println!("AsyncEnum(A)::Dropper::poll: {}", foo.0);
+                    AsyncEnum::B(SyncInt(foo.0))
+                }
+                AsyncEnum::B(foo) => {
+                    println!("AsyncEnum(B)::Dropper::poll: {}", foo.0);
+                    AsyncEnum::A(AsyncInt(foo.0))
+                }
+            };
+            mem::forget(mem::replace(&mut *self, new_self));
+        }
+    }
+}
+
+// FIXME(zetanumbers): Disallow types with `AsyncDrop` in unions
+union AsyncUnion {
+    signed: i32,
+    unsigned: u32,
+}
+
+impl AsyncDrop for AsyncUnion {
+    type Dropper<'a> = impl Future<Output = ()>;
+
+    fn async_drop(self: Pin<&mut Self>) -> Self::Dropper<'_> {
+        async move {
+            println!(
+                "AsyncUnion::Dropper::poll: {}, {}",
+                unsafe { self.signed },
+                unsafe { self.unsigned },
+            );
+        }
+    }
+}
diff --git a/tests/ui/async-await/async-drop.run.stdout b/tests/ui/async-await/async-drop.run.stdout
new file mode 100644
index 0000000000000..9cae4331caf92
--- /dev/null
+++ b/tests/ui/async-await/async-drop.run.stdout
@@ -0,0 +1,22 @@
+AsyncInt::Dropper::poll: 0
+AsyncInt::Dropper::poll: 1
+AsyncInt::Dropper::poll: 2
+AsyncInt::Dropper::poll: 3
+AsyncInt::Dropper::poll: 4
+AsyncStruct::Dropper::poll: 6
+AsyncInt::Dropper::poll: 7
+AsyncInt::Dropper::poll: 8
+AsyncReference::Dropper::poll: 10
+AsyncInt::Dropper::poll: 11
+AsyncEnum(A)::Dropper::poll: 12
+SyncInt::drop: 12
+AsyncEnum(B)::Dropper::poll: 13
+AsyncInt::Dropper::poll: 13
+SyncInt::drop: 14
+SyncThenAsync::drop: 15
+AsyncInt::Dropper::poll: 16
+SyncInt::drop: 17
+AsyncInt::Dropper::poll: 18
+AsyncInt::Dropper::poll: 19
+AsyncInt::Dropper::poll: 20
+AsyncUnion::Dropper::poll: 21, 21