Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public final class arrow/resilience/CircuitBreaker$State$Open : arrow/resilience
public fun getOpeningStrategy ()Larrow/resilience/CircuitBreaker$OpeningStrategy;
public final fun getResetTimeout-UwyO8pc ()J
public final fun getStartedAt ()Lkotlin/time/TimeMark;
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ final class arrow.resilience/CircuitBreaker { // arrow.resilience/CircuitBreaker
final fun <get-startedAt>(): kotlin.time/TimeMark // arrow.resilience/CircuitBreaker.State.Open.startedAt.<get-startedAt>|<get-startedAt>(){}[0]

final fun equals(kotlin/Any?): kotlin/Boolean // arrow.resilience/CircuitBreaker.State.Open.equals|equals(kotlin.Any?){}[0]
final fun hashCode(): kotlin/Int // arrow.resilience/CircuitBreaker.State.Open.hashCode|hashCode(){}[0]
final fun toString(): kotlin/String // arrow.resilience/CircuitBreaker.State.Open.toString|toString(){}[0]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public final class arrow/resilience/CircuitBreaker$State$Open : arrow/resilience
public fun getOpeningStrategy ()Larrow/resilience/CircuitBreaker$OpeningStrategy;
public final fun getResetTimeout-UwyO8pc ()J
public final fun getStartedAt ()Lkotlin/time/TimeMark;
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
package arrow.resilience

import arrow.atomic.Atomic
import arrow.atomic.update
import arrow.atomic.updateAndGet
import arrow.core.Either
import arrow.core.identity
import arrow.core.left
import arrow.core.nonFatalOrThrow
import arrow.core.right
import arrow.core.raise.catch
import arrow.resilience.CircuitBreaker.State.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
Expand All @@ -34,7 +33,7 @@ import kotlin.time.TimeSource
* 1. [Closed]: This is its normal state, where requests are being made. The state in which [CircuitBreaker] starts.
* - When an exception occurs it increments the failure counter
* - A successful request will reset the failure counter to zero
* - When the failure counter reaches the [maxFailures] threshold, the breaker is tripped into the [Open] state
* - When the failure counter reaches the [OpeningStrategy.Count.maxFailures] threshold, the breaker is tripped into the [Open] state
*
* 2. [Open]: The [CircuitBreaker] will short-circuit/fail-fast all requests
* - All requests short-circuit/fail-fast with `ExecutionRejected`
Expand Down Expand Up @@ -143,8 +142,25 @@ private constructor(
private val onHalfOpen: suspend () -> Unit,
private val onOpen: suspend () -> Unit
) {
private fun copy(
onRejected: suspend () -> Unit = this.onRejected,
onClosed: suspend () -> Unit = this.onClosed,
onHalfOpen: suspend () -> Unit = this.onHalfOpen,
onOpen: suspend () -> Unit = this.onOpen
) = CircuitBreaker(
state = state,
resetTimeout = resetTimeout,
exponentialBackoffFactor = exponentialBackoffFactor,
maxResetTimeout = maxResetTimeout,
timeSource = timeSource,
onRejected = onRejected,
onClosed = onClosed,
onHalfOpen = onHalfOpen,
onOpen = onOpen,
)

/** Returns the current [State], meant for debugging purposes.*/
@Suppress("RedundantSuspendModifier")
public suspend fun state(): State = state.get()

/**
Expand All @@ -168,11 +184,7 @@ private constructor(
contract {
callsInPlace(fa, InvocationKind.AT_MOST_ONCE)
}
return try {
Either.Right(protectOrThrow(fa))
} catch (e: ExecutionRejected) {
Either.Left(e)
}
return Either.catchOrThrow<ExecutionRejected, _> { protectOrThrow(fa) }
}

/**
Expand All @@ -184,14 +196,7 @@ private constructor(
callsInPlace(fa, InvocationKind.EXACTLY_ONCE)
}
return when (val curr = state.get()) {
is Closed -> {
// This is markOrResetFailures(Either.catch { fa() }), but inlined to make the compiler happy with the contract
try {
markOrResetFailures(fa().right())
} catch (e: Throwable) {
markOrResetFailures(e.nonFatalOrThrow().left())
}
}
is Closed -> markOrResetFailures(fa)
is Open -> {
if (curr.expiresAt.hasPassedNow()) {
// The Open state has expired, so we are transition to HalfOpen and attempt to close the CircuitBreaker
Expand All @@ -200,59 +205,47 @@ private constructor(
} else {
// Open isn't expired, so we reject execution
val expiresInMillis = curr.expiresAt.elapsedNow().absoluteValue.inWholeMilliseconds
onRejected.invoke()
throw ExecutionRejected(
"Rejected because the CircuitBreaker is in the Open state, attempting to close in $expiresInMillis millis",
curr
)
curr.rejectExecution("Rejected because the CircuitBreaker is in the Open state, attempting to close in $expiresInMillis millis")
}
}

is HalfOpen -> {
// CircuitBreaker is in HalfOpen state, which means we still reject all tasks, while waiting to see if our attempt to close the CircuitBreaker succeeds or fails
onRejected.invoke()
throw ExecutionRejected("Rejected because the CircuitBreaker is in the HalfOpen state", curr)
curr.rejectExecution("Rejected because the CircuitBreaker is in the HalfOpen state")
}
}
}

/** Function for counting failures in the `Closed` state, triggering the `Open` state if necessary.*/
private tailrec suspend fun <A> markOrResetFailures(result: Either<Throwable, A>): A =
when (val curr = state.get()) {
is Closed -> {
when (result) {
is Either.Right -> {
val openingStrategy = state.get().openingStrategy
if (openingStrategy is OpeningStrategy.Count && openingStrategy.failuresCount == 0)
result.value
else {
if (!state.compareAndSet(curr, Closed(openingStrategy.resetFailuresCount()))) markOrResetFailures(result)
else result.value
}
}
private suspend fun State.rejectExecution(message: String): Nothing {
onRejected.invoke()
throw ExecutionRejected(message, this)
}

is Either.Left -> {
val currentOpeningStrategy = curr.openingStrategy.trackFailure(timeSource.markNow())
// In case of failure, we either increment the failures counter, or we transition in the `Open` state.
if (currentOpeningStrategy.shouldOpen()) {
// We've gone over the permitted failures threshold, so we need to open the circuit breaker
val update = Open(currentOpeningStrategy, timeSource.markNow(), resetTimeout, CompletableDeferred())
if (!state.compareAndSet(curr, update)) markOrResetFailures<A>(result)
else {
onOpen.invoke()
throw result.value
}
} else {
// It's fine, just increment the failures count
if (!state.compareAndSet(curr, Closed(currentOpeningStrategy))) markOrResetFailures<A>(result)
else throw result.value
}
}
}
private suspend fun <A> markOrResetFailures(fa: suspend () -> A): A {
contract {
callsInPlace(fa, InvocationKind.EXACTLY_ONCE)
}
// the `return` is here to make the compiler happy about the contract
catch({
val result = fa()
state.update {
if (it !is Closed) return result
Closed(it.openingStrategy.resetFailuresCount())
}

else -> result.fold({ throw it }, ::identity)
return result
}) { error ->
val newState = state.updateAndGet {
if (it !is Closed) throw error
val newStrategy = it.openingStrategy.trackFailure(timeSource.markNow())
if (newStrategy.shouldOpen()) {
// We've gone over the permitted failures threshold, so we need to open the circuit breaker
Open(newStrategy, timeSource.markNow(), resetTimeout, CompletableDeferred())
} else Closed(newStrategy)// It's fine, just increment the failures count
}
if (newState is Open) onOpen.invoke()
throw error
}
}

/** Internal function that is the handler for the reset attempt when the circuit breaker is in `HalfOpen`.
* In this state we can either transition to `Closed` in case the attempt was successful, or to `Open` again, in case the attempt failed.
Expand Down Expand Up @@ -310,17 +303,7 @@ private constructor(
* @return a new circuit breaker wrapping the state of the source.
*/
public fun doOnRejectedTask(callback: suspend () -> Unit): CircuitBreaker =
CircuitBreaker(
state = state,
resetTimeout = resetTimeout,
exponentialBackoffFactor = exponentialBackoffFactor,
maxResetTimeout = maxResetTimeout,
onRejected = suspend { onRejected.invoke(); callback.invoke() },
timeSource = timeSource,
onClosed = onClosed,
onHalfOpen = onHalfOpen,
onOpen = onOpen
)
copy(onRejected = suspend { onRejected.invoke(); callback.invoke() })

/** Returns a new circuit breaker that wraps the state of the source
* and that will fire the given callback upon the circuit breaker
Expand All @@ -336,17 +319,7 @@ private constructor(
* @return a new circuit breaker wrapping the state of the source.
*/
public fun doOnClosed(callback: suspend () -> Unit): CircuitBreaker =
CircuitBreaker(
state = state,
resetTimeout = resetTimeout,
exponentialBackoffFactor = exponentialBackoffFactor,
maxResetTimeout = maxResetTimeout,
onRejected = onRejected,
timeSource = timeSource,
onClosed = suspend { onClosed.invoke(); callback.invoke(); },
onHalfOpen = onHalfOpen,
onOpen = onOpen
)
copy(onClosed = suspend { onClosed.invoke(); callback.invoke(); })

/** Returns a new circuit breaker that wraps the state of the source
* and that will fire the given callback upon the circuit breaker
Expand All @@ -362,17 +335,7 @@ private constructor(
* @return a new circuit breaker wrapping the state of the source
*/
public fun doOnHalfOpen(callback: suspend () -> Unit): CircuitBreaker =
CircuitBreaker(
state = state,
resetTimeout = resetTimeout,
exponentialBackoffFactor = exponentialBackoffFactor,
maxResetTimeout = maxResetTimeout,
timeSource = timeSource,
onRejected = onRejected,
onClosed = onClosed,
onHalfOpen = { onHalfOpen.invoke(); callback.invoke() },
onOpen = onOpen
)
copy(onHalfOpen = { onHalfOpen.invoke(); callback.invoke() })

/** Returns a new circuit breaker that wraps the state of the source
* and that will fire the given callback upon the circuit breaker
Expand All @@ -388,17 +351,7 @@ private constructor(
* @return a new circuit breaker wrapping the state of the source
*/
public fun doOnOpen(callback: suspend () -> Unit): CircuitBreaker =
CircuitBreaker(
state = state,
resetTimeout = resetTimeout,
exponentialBackoffFactor = exponentialBackoffFactor,
maxResetTimeout = maxResetTimeout,
timeSource = timeSource,
onRejected = onRejected,
onClosed = onClosed,
onHalfOpen = onHalfOpen,
onOpen = { onOpen.invoke(); callback.invoke() }
)
copy(onOpen = { onOpen.invoke(); callback.invoke() })

/**
* The initial state when initializing a [CircuitBreaker] is [Closed].
Expand All @@ -415,7 +368,7 @@ private constructor(
* [Closed] is the normal state of the [CircuitBreaker], where requests are being made. The state in which [CircuitBreaker] starts.
* - When an exception occurs it increments the failure counter
* - A successful request will reset the failure counter to zero
* - When the failure counter reaches the [maxFailures] threshold, the breaker is tripped into the [Open] state
* - When the failure counter reaches the [OpeningStrategy.Count.maxFailures] threshold, the breaker is tripped into the [Open] state
*
* @param openingStrategy is the strategy that will decide if the circuit breaker should open after some failures.
*/
Expand Down Expand Up @@ -448,8 +401,9 @@ private constructor(
public val expiresAt: TimeMark = startedAt + resetTimeout

override fun equals(other: Any?): Boolean =
if (other is Open) this.startedAt == startedAt && this.resetTimeout == resetTimeout
else false
other is Open && this.startedAt == other.startedAt && this.resetTimeout == other.resetTimeout

override fun hashCode(): Int = 31 * startedAt.hashCode() + resetTimeout.hashCode()

override fun toString(): String =
"CircuitBreaker.State.Open(startedAt=$startedAt, resetTimeoutNanos=$resetTimeout, expiresAt=$expiresAt)"
Expand Down Expand Up @@ -505,7 +459,7 @@ private constructor(
*
* @param onRejected is a callback for signaling rejected tasks, so
* every time a task execution is attempted and rejected in
* [CircuitBreaker.Open] or [CircuitBreaker.HalfOpen]
* [CircuitBreaker.State.Open] or [CircuitBreaker.State.HalfOpen]
* states.
*
* @param onClosed is a callback for signaling transitions to [CircuitBreaker.State.Closed].
Expand Down Expand Up @@ -551,9 +505,9 @@ private constructor(

public sealed class OpeningStrategy {
internal fun resetFailuresCount(): OpeningStrategy = when (this) {
is Count -> copy(failuresCount = 0)
else -> this
}
is Count -> copy(failuresCount = 0)
else -> this
}
internal abstract fun shouldOpen(): Boolean
internal abstract fun trackFailure(failureAt: TimeMark): OpeningStrategy

Expand Down