Skip to content

Commit

Permalink
finagle-core: Rename c.t.f.context.Retries to c.t.f.context.Requeues
Browse files Browse the repository at this point in the history
Problem

`c.t.f.context.Retries` is not an accurate name because it actually stores the
number of *requeues* a request has had.

Solution

Rename it to `c.t.f.context.Requeues`.

Differential Revision: https://phabricator.twitter.biz/D1104878
  • Loading branch information
jcrossley authored and jenkins committed Nov 1, 2023
1 parent aa73d41 commit 3c27308
Show file tree
Hide file tree
Showing 14 changed files with 159 additions and 110 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ Bug Fixes

* finagle-core: Failed recvAddress on Linux due to the remote peer resetting connection should now
be properly seen as a `c.t.f.ChannelClosedException` instead of a
`c.t.f.UnknownChannelException`. ``PHAB_ID=`D1104650`
`c.t.f.UnknownChannelException`. ``PHAB_ID=`D1104650``

Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

* finagle-core: The `c.t.f.context.Retries` context has been renamed to `c.t.f.context.Requeues` to reflect what it
actually contains -- the number of requeues a request has had (on the client immediately upstream). Requeues are
retries on write exceptions (i.e. the original request was never sent to the server). ``PHAB_ID=`D1104878``

* finagle: Deposit budget once in MethodBuilder ``PHAB_ID=D1107653``

Expand Down
9 changes: 5 additions & 4 deletions doc/src/sphinx/Contexts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ Current request deadline
``com.twitter.finagle.context.Deadline.current`` —
A broadcast ``Context`` that represents when the request should be completed by.

Current retry attempt
Current requeue attempt
~~~~~~~~~~~~~~~~~~~~~

``com.twitter.finagle.context.Retries.current`` —
A broadcast ``Context`` that represents which retry attempt this request is.
Will have ``attempt`` set to 0 if the request is not a retry.
``com.twitter.finagle.context.Requeues.current`` —
A broadcast ``Context`` that represents which requeue attempt this request is.
Requeues are retries on write exceptions (i.e. the original request was never sent to the server).
Will have ``attempt`` set to 0 if the request is not a requeue.

Current TLS session
~~~~~~~~~~~~~~~~~~~
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package com.twitter.finagle.http.codec.context

import com.twitter.finagle.context.Contexts
import com.twitter.finagle.http.{HeaderMap, Message}
import com.twitter.finagle.http.HeaderMap
import com.twitter.finagle.http.Message
import com.twitter.finagle.util.LoadService
import com.twitter.logging.{Level, Logger}
import com.twitter.util.{Return, Throw, Try}
import com.twitter.logging.Level
import com.twitter.logging.Logger
import com.twitter.util.Return
import com.twitter.util.Throw
import com.twitter.util.Try
import scala.collection.mutable
import scala.util.control.NonFatal

Expand Down Expand Up @@ -47,7 +51,7 @@ object HttpContext {
private[this] val knownContextTypes: Array[HttpContext] = {
Array[HttpContext](
HttpDeadline,
HttpRetries,
HttpRequeues,
HttpBackupRequest
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
package com.twitter.finagle.http.codec.context

import com.twitter.finagle.context.{Contexts, Retries}
import com.twitter.util.{Return, Throw, Try}
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.context.Requeues
import com.twitter.util.Return
import com.twitter.util.Throw
import com.twitter.util.Try
import scala.util.control.NonFatal

private object HttpRetries extends HttpContext {
private object HttpRequeues extends HttpContext {

type ContextKeyType = Retries
val key: Contexts.broadcast.Key[Retries] = Retries
type ContextKeyType = Requeues
val key: Contexts.broadcast.Key[Requeues] = Requeues

def toHeader(retries: Retries): String = {
retries.attempt.toString
def toHeader(requeues: Requeues): String = {
requeues.attempt.toString
}

def fromHeader(header: String): Try[Retries] = {
def fromHeader(header: String): Try[Requeues] = {
try {
Return(Retries(header.toInt))
Return(Requeues(header.toInt))
} catch {
case NonFatal(e) => Throw(new NumberFormatException)
case NonFatal(_) => Throw(new NumberFormatException)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package com.twitter.finagle.http.codec.context

import com.twitter.conversions.DurationOps._
import com.twitter.finagle.context.{BackupRequest, Contexts, Deadline, Retries}
import com.twitter.finagle.http.{Message, Method, Request, Version}
import com.twitter.finagle.context.BackupRequest
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.context.Deadline
import com.twitter.finagle.context.Requeues
import com.twitter.finagle.http.Message
import com.twitter.finagle.http.Method
import com.twitter.finagle.http.Request
import com.twitter.finagle.http.Version
import com.twitter.io.Buf
import com.twitter.util.Try
import org.scalatest.funsuite.AnyFunSuite
Expand Down Expand Up @@ -89,48 +95,48 @@ class HttpContextTest extends AnyFunSuite {
}
}

test("written request retries matches read request retries") {
test("written request requeues matches read request requeues") {
val m = newMsg()
val writtenRetries = Retries(5)
Contexts.broadcast.let(Retries, writtenRetries) {
val writtenRequeues = Requeues(5)
Contexts.broadcast.let(Requeues, writtenRequeues) {
HttpContext.write(m)

// Clear Retries in the Context
Contexts.broadcast.letClear(Retries) {
// Clear Requeues in the Context
Contexts.broadcast.letClear(Requeues) {

// Ensure the Retries was cleared
assert(Retries.current == None)
// Ensure the Requeues was cleared
assert(Requeues.current == None)

HttpContext.read(m) {
val readRetries = Retries.current.get
assert(writtenRetries == readRetries)
val readRequeues = Requeues.current.get
assert(writtenRequeues == readRequeues)
}
}
}
}

test("headers are set/replaced, not added") {
val m = newMsg()
Contexts.broadcast.let(Retries, Retries(5)) {
Contexts.broadcast.let(Requeues, Requeues(5)) {
HttpContext.write(m)
}

assert(m.headerMap.getAll(HttpRetries.headerKey).size == 1)
assert(m.headerMap.getAll(HttpRequeues.headerKey).size == 1)
HttpContext.read(m) {
assert(Contexts.broadcast.get(HttpRetries.key) == Some(Retries(5)))
assert(Contexts.broadcast.get(HttpRequeues.key) == Some(Requeues(5)))
}

// and again...
Contexts.broadcast.let(Retries, Retries(4)) {
Contexts.broadcast.let(Requeues, Requeues(4)) {
HttpContext.write(m)
}

// Still just 1...
assert(m.headerMap.getAll(HttpRetries.headerKey).size == 1)
assert(m.headerMap.getAll(HttpRequeues.headerKey).size == 1)

// Should be the last entry written
HttpContext.read(m) {
assert(Contexts.broadcast.get(HttpRetries.key) == Some(Retries(4)))
assert(Contexts.broadcast.get(HttpRequeues.key) == Some(Requeues(4)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,43 @@ package com.twitter.finagle.http.filter

import com.twitter.conversions.DurationOps._
import com.twitter.finagle.Service
import com.twitter.finagle.context.{Contexts, Deadline, Retries}
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.context.Deadline
import com.twitter.finagle.context.Requeues
import com.twitter.finagle.http.codec.context.HttpContext
import com.twitter.finagle.http.{Request, Response, Status}
import com.twitter.util.{Await, Future}
import com.twitter.finagle.http.Request
import com.twitter.finagle.http.Response
import com.twitter.finagle.http.Status
import com.twitter.util.Await
import com.twitter.util.Future
import org.scalatest.funsuite.AnyFunSuite

class ContextFilterTest extends AnyFunSuite {

test("parses Finagle-Ctx headers") {
val writtenDeadline = Deadline.ofTimeout(5.seconds)
val writtenRetries = Retries(5)
val writtenRequeues = Requeues(5)
val service =
new ClientContextFilter[Request, Response] andThen
new ServerContextFilter[Request, Response] andThen
Service.mk[Request, Response] { req =>
assert(Deadline.current.get == writtenDeadline)
assert(Retries.current.get == writtenRetries)
assert(Requeues.current.get == writtenRequeues)
Future.value(Response())
}

Contexts.broadcast.let(Deadline, writtenDeadline) {
Contexts.broadcast.let(Retries, writtenRetries) {
Contexts.broadcast.let(Requeues, writtenRequeues) {
val req = Request()
HttpContext.write(req)

// Clear the deadline/retries values in the context
// Clear the deadline/requeues values in the context
Contexts.broadcast.letClearAll {
// ensure the deadline was cleared
assert(Deadline.current == None)

// ensure the retries was cleared
assert(Retries.current == None)
// ensure the requeues was cleared
assert(Requeues.current == None)

val rsp = Await.result(service(req))
assert(rsp.status == Status.Ok)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ object StackClient {
stk.push(PrepFactory.module)
stk.push(FactoryToService.module)
stk.push(Retries.moduleRequeueable)
stk.push(ClearContextValueFilter.module(context.Retries))
stk.push(ClearContextValueFilter.module(context.Requeues))
stk.push(ExceptionSourceFilter.module)

/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.twitter.finagle.context

import com.twitter.io.Buf
import com.twitter.io.BufByteWriter
import com.twitter.io.ByteReader
import com.twitter.util.Return
import com.twitter.util.Throw
import com.twitter.util.Try

/**
* Requeues contains the number of times a request has been requeued.
*
* @param attempt which retry attempt this is. Will be 0
* if the request is not a requeue.
*/
case class Requeues(attempt: Int)

/**
* Note: The context id is "c.t.f.Retries" and not "c.t.f.Requeues" because we have renamed the
* Retries context to Requeues (to reflect what it actually contains, which is the requeues), but we
* don't want to break existing users/deployments using this context by changing the key.
*/
object Requeues extends Contexts.broadcast.Key[Requeues]("com.twitter.finagle.Retries") {

def current: Option[Requeues] =
Contexts.broadcast.get(Requeues)

override def marshal(requeues: Requeues): Buf = {
val bw = BufByteWriter.fixed(4)
bw.writeIntBE(requeues.attempt)
bw.owned()
}

override def tryUnmarshal(buf: Buf): Try[Requeues] = {
if (buf.length != 4) {
Throw(
new IllegalArgumentException(
s"Could not extract Requeues from Buf. Length ${buf.length} but required 4"
)
)
} else {
val attempt: Int = ByteReader(buf).readIntBE()
Return(Requeues(attempt))
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package com.twitter.finagle.service

import com.twitter.finagle.context.Contexts
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.tracing.{Annotation, Trace, TraceId}
import com.twitter.finagle.{context, _}
import com.twitter.finagle.tracing.Annotation
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.tracing.TraceId
import com.twitter.finagle.context
import com.twitter.finagle._
import com.twitter.util._

/**
Expand Down Expand Up @@ -66,7 +69,7 @@ private[finagle] class RequeueFilter[Req, Rep](
retriesRemaining: Int,
backoffs: Backoff
): Future[Rep] = {
Contexts.broadcast.let(context.Retries, context.Retries(attempt)) {
Contexts.broadcast.let(context.Requeues, context.Requeues(attempt)) {
val trace = Trace()
val shouldTrace = attempt > 0 && trace.isActivelyTracing
if (shouldTrace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package com.twitter.finagle.filter

import com.twitter.conversions.DurationOps._
import com.twitter.finagle._
import com.twitter.finagle.context.{Contexts, Retries}
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.context.Requeues
import com.twitter.finagle.stack.nilStack
import com.twitter.util.{Promise, Await, Future}
import com.twitter.util.Promise
import com.twitter.util.Await
import com.twitter.util.Future
import org.scalatest.funsuite.AnyFunSuite

class ClearContextValueFilterTest extends AnyFunSuite {
Expand All @@ -20,16 +23,16 @@ class ClearContextValueFilterTest extends AnyFunSuite {
def make(next: ServiceFactory[Unit, Unit]) =
new SimpleFilter[Unit, Unit] {
def apply(req: Unit, service: Service[Unit, Unit]): Future[Unit] = {
Contexts.broadcast.let(Retries, Retries(5)) {
assert(Retries.current == Some(Retries(5)))
Contexts.broadcast.let(Requeues, Requeues(5)) {
assert(Requeues.current == Some(Requeues(5)))
setContextFilterCalled.setDone()
service(req)
}
}
}.andThen(next)
}

val clearContextFilter = ClearContextValueFilter.module[Unit, Unit](Retries)
val clearContextFilter = ClearContextValueFilter.module[Unit, Unit](Requeues)

val verifyContextClearedFilter =
new Stack.Module0[ServiceFactory[Unit, Unit]] {
Expand All @@ -38,7 +41,7 @@ class ClearContextValueFilterTest extends AnyFunSuite {
def make(next: ServiceFactory[Unit, Unit]) =
new SimpleFilter[Unit, Unit] {
def apply(req: Unit, service: Service[Unit, Unit]): Future[Unit] = {
assert(Retries.current == None)
assert(Requeues.current == None)
verifyContextClearedFilterCalled.setDone()
service(req)
}
Expand Down
Loading

0 comments on commit 3c27308

Please sign in to comment.