LeaseTracker

mill.exec.Execution.LeaseTracker
class LeaseTracker(indexToTerminal: Array[Task[_]], interGroupDeps: Map[Task[_], Seq[Task[_]]])

Tracks per-task read leases on the workspace lock and releases them once every transitive downstream task that depends on the holder has completed.

The transitive part matters because a direct downstream can forward PathRefs or other data from an upstream output to its own downstreams. Releasing the upstream read lease when only the direct downstream completes would let a concurrent launcher overwrite that output while a later transitive downstream may still read it.

Attributes

Source
Execution.scala
Graph
Supertypes
class Object
trait Matchable
class Any

Members list

Type members

Classlikes

class Retained(val path: Path, val label: String, val key: String, var lease: Lease, var observedVersion: Long)

Attributes

Source
Execution.scala
Supertypes
class Object
trait Matchable
class Any
class State(initialPending: Int)

Attributes

Source
Execution.scala
Supertypes
class Object
trait Matchable
class Any

Value members

Concrete methods

def drain(): Unit

Attributes

Source
Execution.scala
def onCompleted(terminal: Task[_]): Unit

Attributes

Source
Execution.scala
def reacquireDropped(workspaceLocking: LauncherLocking, waitReporter: WaitReporter, block: Boolean = ...): Unit

Attributes

Source
Execution.scala
def releaseHigherThan(key: String): Unit

Attributes

Source
Execution.scala
def retain(task: Task[_], path: Path, label: String, lease: Lease, observedVersion: Long): Unit

Attributes

Source
Execution.scala
def withActiveConsumers[T](terminal: Task[_], reacquire: () => Unit)(body: => T): T

Mark every transitive upstream of terminal as having an active consumer for the duration of body, so releaseHigherThan will not drop their retained reads while body may still read those outputs.

Mark every transitive upstream of terminal as having an active consumer for the duration of body, so releaseHigherThan will not drop their retained reads while body may still read those outputs.

Incrementing alone is not sufficient: a sibling future may already have dropped (or be concurrently dropping) one of those reads in the window before we incremented, and the increment never re-takes a lease that is already gone. So after marking the upstreams active — which blocks any further drops, since releaseHigherThan skips tasks whose activeConsumers is non-zero — we reacquire() to re-take and re-validate anything dropped beforehand, before body (user code or worker cleanup) can observe an unprotected upstream dest/. reacquire may throw RetryDueToDroppedTaskLock if an upstream was rewritten by a peer while it was dropped, forcing a from-scratch retry.

Attributes

Source
Execution.scala

Concrete fields

val states: ConcurrentHashMap[Task[_], State]

Attributes

Source
Execution.scala