Tasks
One of Mill’s core abstractions is its Task Graph: this is how Mill defines, orders and caches work it needs to do, and exists independently of any support for building Java, Kotlin, or Scala.
Mill task graphs are primarily built using methods and macros defined on
mill.define.Task, whose type is often aliased as T[_] for conciseness:
Task Cheat Sheet
The following table might help you make sense of the small collection of different Task types:
| Target | Command | Source/Input | Persistent Task | Worker | Anonymous Task | |
|---|---|---|---|---|---|---|
Cached to Disk |
X |
X |
||||
JSON Writable |
X |
X |
X |
X |
||
JSON Readable |
X |
X |
||||
CLI Runnable |
X |
X |
X |
|||
Takes Arguments |
X |
X |
||||
Cached In-Memory |
X |
The following is a simple self-contained example using Mill to compile Java,
making use of the most common Task.Source, Task, and Task.Command types
to define a simple build graph with some input source files and intermediate build steps:
package build
import mill._
def mainClass: T[Option[String]] = Some("foo.Foo")
def sources = Task.Source("src")
def resources = Task.Source("resources")
def compile = Task {
val allSources = os.walk(sources().path)
os.call(("javac", allSources, "-d", Task.dest))
PathRef(Task.dest)
}
def assembly = Task {
for (p <- Seq(compile(), resources())) os.copy(p.path, Task.dest, mergeFolders = true)
val mainFlags = mainClass().toSeq.flatMap(Seq("-e", _))
os.call(
("jar", "-c", mainFlags, "-f", Task.dest / "assembly.jar", "."),
cwd = Task.dest
)
PathRef(Task.dest / "assembly.jar")
}
def run(args: String*) = Task.Command {
os.call(("java", "-jar", assembly().path, args), stdout = os.Inherit)
}
This code defines the following task graph, with the boxes being the tasks and the arrows representing the data-flow between them:
This example does not use any of Mill’s builtin support for building Java or
Scala projects, and instead builds a pipeline "from scratch" using Mill
tasks and javac/jar/java subprocesses. It makes use of the three most
common kind of tasks:
> ./mill show assembly
".../out/assembly.dest/assembly.jar"
> java -jar out/assembly.dest/assembly.jar i am cow
Foo.value: 31337
args: i am cow
> unzip -p out/assembly.dest/assembly.jar foo.txt
My Example Text
When you first evaluate assembly (e.g. via mill assembly at the command
line), it will evaluate all the defined tasks: mainClass, sources,
compile, and assembly.
Subsequent invocations of mill assembly will evaluate only as much as is
necessary, depending on what input sources changed:
-
If the files in
sourceschange, it will re-evaluatecompile, andassembly(red)
-
If the files in
resourceschange, it will only re-evaluateassembly(red) and use the cached output ofcompile(green)
run behaves differently from assembly: as a Task.Command it is executed
every time you run it, even if none of it’s upstream Tasks or Sources changed.
Command outputs are never cached, and thus can never be
re-used. The only times Commands may be
skipped is due to Selective Test Execution
Primary Tasks
There are three primary kinds of Tasks that you should care about:
Sources
package build
import mill.{Module, T, _}
def sources = Task.Source { "src" }
def resources = Task.Source { "resources" }
Sources are defined using Task.Source{…} taking one os.Path, or Task.Sources{…},
taking multiple os.Paths as arguments. A Source's:
its build signature/hashCode depends not just on the path
it refers to (e.g. foo/bar/baz) but also the MD5 hash of the file contents or
folder tree at that path.
Task.Source and Task.Sources are common inputs in any Mill build:
they watch source files and folders and cause downstream tasks to
re-compute if a change is detected. Non-file inputs to the build can
also be captured via the more general Input tasks
Tasks
def allSources = Task {
os.walk(sources().path)
.filter(_.ext == "java")
.map(PathRef(_))
}
def lineCount: T[Int] = Task {
println("Computing line count")
allSources()
.map(p => os.read.lines(p.path).size)
.sum
}
Taskss are defined using the def foo = Task {…} syntax, and dependencies
on other tasks are defined using foo() to extract the value from them.
Apart from the foo() calls, the Task {…} block contains arbitrary code that
does some work and returns a result. Note that tasks cannot have circular dependencies
between each other.
The os.walk and os.read.lines statements above are from the
OS-Lib library, which provides all common
filesystem and subprocess operations for Mill builds. You can see the OS-Lib library
documentation for more details:
> ./mill show lineCount
Computing line count
18
> ./mill show lineCount # line count already cached, doesn't need to be computed
18
If a cached task’s inputs change but its output does not, then
downstream tasks do not re-evaluate. e.g. Someone may change a
comment within an input source file that doesn’t affect the output classfiles.
This is determined using the .hashCode of the cached task’s return value.
Furthermore, when code changes occur, cached tasks only invalidate if the code change
may directly or indirectly affect it. e.g. adding a comment to lineCount will
not cause it to recompute:
def lineCount: T[Int] = Task {
println("Computing line count")
+ // Hello World
allSources()
.map(p => os.read.lines(p.path).size)
.sum
But changing the code of the cached task or any upstream helper method will cause the
old value to be invalidated and a new value re-computed (with a new println)
next time it is invoked:
def lineCount: T[Int] = Task {
- println("Computing line count")
+ println("Computing line count!!!")
allSources()
.map(p => os.read.lines(p.path).size)
.sum
For more information on how invalidating tasks based on code-changes works, see PR#2417 that implemented it.
The return-value of cached tasks has to be JSON-serializable via
uPickle. You can run cached tasks directly from the command
line, or use show if you want to see the JSON content or pipe it to
external tools. See the uPickle library documentation for more details:
Task.dest
Each task is assigned a unique Task.dest
folder on disk (e.g. classFiles is given out/classFiles.dest/). Task.dest is
reset every time a task is recomputed, and can be used as scratch space or to store the task’s
output files. Any metadata returned from the task is automatically JSON-serialized
and stored at out/classFiles.json adjacent to it’s .dest folder. If you want to return a file
or a set of files as the result of a Task, write them to disk within your Task.dest
folder and return a PathRef() that referencing the files or folders
you want to return:
def classFiles = Task {
println("Generating classfiles")
os.proc("javac", allSources().map(_.path), "-d", Task.dest)
.call(cwd = Task.dest)
PathRef(Task.dest)
}
def jar = Task {
println("Generating jar")
os.copy(classFiles().path, Task.dest, mergeFolders = true)
os.copy(resources().path, Task.dest, mergeFolders = true)
os.proc("jar", "-cfe", Task.dest / "foo.jar", "foo.Foo", ".").call(cwd = Task.dest)
PathRef(Task.dest / "foo.jar")
}
> ./mill jar
Generating classfiles
Generating jar
> ./mill show jar
".../out/jar.dest/foo.jar"
os.pwd within a Task is set to the Task.dest folder by default.
This is to stop you from accidentally reading and writing files to the base repository root,
which would cause problems with Mill’s caches not invalidating properly or files from different
tasks colliding and causing issues. In the rare case where
you truly need the Mill project root folder, you can access it via Task.workspace
|
Dependent Tasks
Tasks can depend on other tasks via the foo() syntax.
def largeFile = Task {
println("Finding Largest File")
allSources()
.map(_.path)
.filter(_.ext == "java")
.maxBy(os.read.lines(_).size)
}
def hugeFileName = Task {
if (lineCount() > 999) largeFile().last
else "<no-huge-file>"
}
The graph of inter-dependent tasks is evaluated in topological order; that
means that the body of a task will not even begin to evaluate if one of its
upstream dependencies has failed. Similar, even if the upstream tasks is
not used in one branch of an if condition, it will get computed regardless
before the if condition is even considered.
|
The output below demonstrates this behavior, with the println defined
in def largeFile above running even though the largeFile() branch of the
if conditional does not get used:
> ./mill show lineCount
18
> ./mill show hugeFileName # This still runs `largestFile` even though `lineCount() < 999`
Finding Largest File
"<no-huge-file>"
Custom Return Types
All Task return types must be JSON serializable via
uPickle, and
uPickle comes with built-in support for most Scala primitive types and
builtin data structures: tuples, collections, PathRefs, etc. can be
returned and automatically serialized/de-serialized as necessary. One
notable exception is case classes: if you want return your own
case class, you must mark it JSON-serializable by adding the following
implicit to its companion object:
case class ClassFileData(totalFileSize: Long, largestFile: String)
object ClassFileData {
implicit val rw: upickle.default.ReadWriter[ClassFileData] = upickle.default.macroRW
}
def summarizeClassFileStats = Task {
val files = os.walk(classFiles().path)
ClassFileData(
totalFileSize = files.map(os.size(_)).sum,
largestFile = files.maxBy(os.size(_)).last
)
}
> ./mill show summarizeClassFileStats
{
"totalFileSize": ...,
"largestFile": "..."
}
For more details on how to use uPickle, check out the uPickle library documentation
Commands
def run(mainClass: String, args: String*) = Task.Command {
os.proc(
"java",
"-cp",
s"${classFiles().path}:${resources().path}",
mainClass,
args
)
.call(stdout = os.Inherit)
}
Defined using Task.Command {…} syntax, Commands can run arbitrary code, with
dependencies declared using the same foo() syntax (e.g. classFiles() above).
Commands can be parametrized, but their output is not cached, so they will
re-evaluate every time even if none of their inputs have changed. The only times
Commands may be skipped is due to Selective Test Execution.
A command with no parameter is defined as def myCommand() = Task.Command {…}.
It is a compile error if () is missing.
Tasks can take command line params, parsed by the MainArgs
library. Thus the signature def run(mainClass: String, args: String*) takes
params of the form --main-class <str> <arg1> <arg2> … <argn>:
> ./mill run --main-class foo.Foo hello world
Foo.value: 31337
args: hello world
foo.txt resource: My Example Text
Command line arguments can take most primitive types: String, Int, Boolean, etc.,
along with Option[T] representing optional values and Seq[T] representing repeatable values,
and mainargs.Flag representing flags and mainargs.Leftover[T] representing any command line
arguments not parsed earlier. Default values for command line arguments are also supported.
See the mainargs documentation for more details:
By default, all command parameters need to be named, except for variadic parameters
of type T* or mainargs.Leftover[T], or those marked as @arg(positional = true).
You can use also the flag --allow-positional-command-args to globally allow
arguments to be passed positionally, as shown below:
> ./mill run foo.Foo hello world # this raises an error because `--main-class` is not given
error: Missing argument: --mainClass <str>
Expected Signature: run
--mainClass <str>
args <str>...
...
> ./mill --allow-positional run foo.Foo hello world # this succeeds due to --allow-positional
Foo.value: 31337
args: hello world
foo.txt resource: My Example Text
Like Tasks, a command only evaluates after all its upstream dependencies have completed, and will not begin to run if any upstream dependency has failed.
Commands are assigned the same scratch/output folder out/run.dest/ as
Tasks are, and their returned metadata stored at the same out/run.json
path for consumption by external tools.
Other Tasks
Inputs
package build
import mill._
def myInput = Task.Input {
os.proc("git", "rev-parse", "HEAD").call(cwd = Task.workspace)
.out
.text()
.trim()
}
A generalization of Sources, Task.Inputs are tasks that re-evaluate
every time (unlike Anonymous Tasks), containing an
arbitrary block of code.
Inputs can be used to force re-evaluation of some external property that may
affect your build. For example, if I have cached a Task bar that
calls out to git to compute the latest commit hash and message directly,
that target does not have any Task inputs and so will never re-compute
even if the external git status changes:
def gitStatusTask = Task {
"version-" +
os.proc("git", "log", "-1", "--pretty=format:%h-%B ")
.call(cwd = Task.workspace)
.out
.text()
.trim()
}
> git init .
> git commit --allow-empty -m "Initial-Commit"
> ./mill show gitStatusTask
"version-...-Initial-Commit"
> git commit --allow-empty -m "Second-Commit"
> ./mill show gitStatusTask # Mill didn't pick up the git change!
"version-...-Initial-Commit"
gitStatusTask will not know that git rev-parse can change, and will
not know to re-evaluate when your git log does change. This means
gitStatusTask will continue to use any previously cached value, and
gitStatusTask's output will be out of date!
To fix this, you can wrap your git log in a Task.Input:
def gitStatusInput = Task.Input {
os.proc("git", "log", "-1", "--pretty=format:%h-%B ")
.call(cwd = Task.workspace)
.out
.text()
.trim()
}
def gitStatusTask2 = Task { "version-" + gitStatusInput() }
This makes gitStatusInput to always re-evaluate every build, and only if
the output of gitStatusInput changes will gitStatusTask2 re-compute
> git commit --allow-empty -m "Initial-Commit"
> ./mill show gitStatusTask2
"version-...-Initial-Commit"
> git commit --allow-empty -m "Second-Commit"
> ./mill show gitStatusTask2 # Mill picked up git change
"version-...-Second-Commit"
Note that because Task.Inputs re-evaluate every time, you should ensure that the
code you put in Task.Input runs quickly. Ideally it should just be a simple check
"did anything change?" and any heavy-lifting should be delegated to downstream
tasks where it can be cached if possible.
System Properties Inputs
One major use case of Input tasks is to make your build configurable via
JVM system properties of environment variables. If you directly access
sys.props or sys.env inside a cached Task, the
cached value will be used even if the property or environment variable changes
in subsequent runs, when you really want it to be re-evaluated. Thus, accessing
system properties should be done in a Task.Input, and usage of the property
should be done downstream in a cached task:
def myPropertyInput = Task.Input {
sys.props("my-property")
}
def myPropertyTask = Task {
"Hello Prop " + myPropertyInput()
}
> ./mill show myPropertyTask
"Hello Prop null"
> ./mill -Dmy-property=world show myPropertyTask # Task is correctly invalidated when prop is added
"Hello Prop world"
> ./mill show myPropertyTask # Task is correctly invalidated when prop is removed
"Hello Prop null"
Again, Task.Input runs every time, and thus you should only do the bare minimum
in your Task.Input that is necessary to detect changes. Any further processing
should be done in downstreak cached tasks to allow for proper
caching and re-use
Environment Variable Inputs
Like system properties, environment variables should be referenced in Task.Inputs. Unlike
system properties, you need to use the special API Task.env to access the environment,
due to JVM limitations:
def myEnvInput = Task.Input {
Task.env.getOrElse("MY_ENV", null)
}
def myEnvTask = Task {
"Hello Env " + myEnvInput()
}
> ./mill show myEnvTask
"Hello Env null"
> MY_ENV=world ./mill show myEnvTask # Task is correctly invalidated when env is added
"Hello Env world"
> ./mill show myEnvTask # Task is correctly invalidated when env is removed
"Hello Env null"
Persistent Tasks
Persistent tasks defined using Task(persistent = true) are similar to normal
cached Tasks, except their Task.dest folder is not cleared before every
evaluation. This makes them useful for caching things on disk in a more
fine-grained manner than Mill’s own Task-level caching: the task can
maintain a cache of one or more files on disk, and decide itself which files
(or parts of which files!) need to invalidate, rather than having all generated
files wiped out every time (which is the default behavior for normal Tasks).
Below is a semi-realistic example of using Task(persistent = true) to compress
files in an input folder, cache the previously-compressed files in Task.dest / "cache",
and re-use previously-compressed files if a file in the
input folder did not change:
package build
import mill._, scalalib._
import java.util.Arrays
import java.io.ByteArrayOutputStream
import java.util.zip.GZIPOutputStream
def data = Task.Source("data")
def compressedData = Task(persistent = true) {
println("Evaluating compressedData")
os.makeDir.all(Task.dest / "cache")
os.remove.all(Task.dest / "compressed")
for (p <- os.list(data().path)) {
val compressedPath = Task.dest / "compressed" / s"${p.last}.gz"
val bytes = os.read.bytes(p)
val hash = Arrays.hashCode(bytes)
val cachedPath = Task.dest / "cache" / hash.toHexString
if (!os.exists(cachedPath)) {
println("Compressing: " + p.last)
os.write(cachedPath, compressBytes(bytes))
} else {
println("Reading Cached from disk: " + p.last)
}
os.copy(cachedPath, compressedPath, createFolders = true)
}
os.list(Task.dest / "compressed").map(PathRef(_))
}
def compressBytes(input: Array[Byte]) = {
val bos = new ByteArrayOutputStream(input.length)
val gzip = new GZIPOutputStream(bos)
gzip.write(input)
gzip.close()
bos.toByteArray
}
In this example, we implement a compressedData task that takes a folder
of files in data and compresses them, while maintaining a cache of
compressed contents for each file. That means that if the data folder
is modified, but some files remain unchanged, those files would not be
unnecessarily re-compressed when compressedData evaluates.
Since persistent tasks have long-lived state on disk that lives beyond a
single evaluation, this raises the possibility of the disk contents getting
into a bad state and causing all future evaluations to fail. It is left up
to user of Task(persistent = true) to ensure their implementation
is eventually consistent. You can also use mill clean to manually purge
the disk contents to start fresh.
> ./mill show compressedData
Evaluating compressedData
Compressing: hello.txt
Compressing: world.txt
[
".../hello.txt.gz",
".../world.txt.gz"
]
> ./mill compressedData # when no input changes, compressedData does not evaluate at all
> sed -i.bak 's/Hello/HELLO/g' data/hello.txt
> ./mill compressedData # when one input file changes, only that file is re-compressed
Compressing: hello.txt
Reading Cached from disk: world.txt
> ./mill clean compressedData
> ./mill compressedData
Evaluating compressedData
Compressing: hello.txt
Compressing: world.txt
Workers
Mill workers defined using Task.Worker are long-lived in-memory objects that
can persistent across multiple evaluations. These are similar to persistent
tasks in that they let you cache things, but the fact that they let you
cache the worker object in-memory allows for greater performance and
flexibility: you are no longer limited to caching only serializable data
and paying the cost of serializing it to disk every evaluation.
Common things to put in workers include:
-
References to third-party daemon processes, e.g. Webpack or wkhtmltopdf, which perform their own in-memory caching
-
Classloaders containing plugin code, to avoid classpath conflicts while also avoiding classloading cost every time the code is executed
This example uses a Worker to provide simple two-level cache: in-memory caching
for compressed files, in addition to caching them on disk. This means that if the
Mill process is persistent (e.g. with --watch/-w) the cache lookups are instant,
but even if the Mill process is restarted it can load the cache values from disk
without having to recompute them:
package build
import mill._, scalalib._
import java.util.Arrays
import java.io.ByteArrayOutputStream
import java.util.zip.GZIPOutputStream
def data = Task.Source("data")
def compressWorker = Task.Worker { new CompressWorker(Task.dest) }
def compressedData = Task {
println("Evaluating compressedData")
for (p <- os.list(data().path)) {
os.write(
Task.dest / s"${p.last}.gz",
compressWorker().compress(p.last, os.read.bytes(p))
)
}
os.list(Task.dest).map(PathRef(_))
}
class CompressWorker(dest: os.Path) {
val cache = collection.mutable.Map.empty[Int, Array[Byte]]
def compress(name: String, bytes: Array[Byte]): Array[Byte] = {
val hash = Arrays.hashCode(bytes)
if (!synchronized(cache.contains(hash))) {
val cachedPath = dest / hash.toHexString
if (!os.exists(cachedPath)) {
println("Compressing: " + name)
val compressed = compressBytes(bytes)
synchronized {
cache(hash) = compressed
os.write(cachedPath, cache(hash))
}
} else {
println("Cached from disk: " + name)
synchronized {
cache(hash) = os.read.bytes(cachedPath)
}
}
} else {
println("Cached from memory: " + name)
}
synchronized { cache(hash) }
}
}
def compressBytes(input: Array[Byte]) = {
val bos = new ByteArrayOutputStream(input.length)
val gzip = new GZIPOutputStream(bos)
gzip.write(input)
gzip.close()
bos.toByteArray
}
The initialization of a Task.Worker's value is single threaded,
but usage of the worker’s value may be done concurrently. The user of
Task.Worker is responsible for ensuring it’s value is safe to use in a
multi-threaded environment via techniques like locks, atomics, or concurrent data
structures. The example above uses synchronized{} around all access to the
shared cache state to allow the slow compressBytes operation to run in
parallel while the fast mutation of the cache is single-threaded.
|
Workers live as long as the Mill process. By default, consecutive mill
commands in the same folder will re-use the same Mill process and workers,
unless --no-server is passed which will terminate the Mill process and
workers after every command. Commands run repeatedly using --watch will
also preserve the workers between them.
Workers can also make use of their Task.dest folder as a cache that persist
when the worker shuts down, as a second layer of caching. The example usage
below demonstrates how using the --no-server flag will make the worker
read from its disk cache, where it would have normally read from its
in-memory cache
> ./mill show compressedData
Evaluating compressedData
Compressing: hello.txt
Compressing: world.txt
[
".../hello.txt.gz",
"...world.txt.gz"
]
> ./mill compressedData # when no input changes, compressedData does not evaluate at all
> sed -i.bak 's/Hello/HELLO/g' data/hello.txt
> ./mill compressedData # not --no-server, we read the data from memory
Compressing: hello.txt
Cached from memory: world.txt
> ./mill compressedData # --no-server, we read the data from disk
Compressing: hello.txt
Cached from disk: world.txt
Mill uses workers to manage long-lived instances of the Zinc Incremental Scala Compiler and the Scala.js Optimizer. This lets us keep them in-memory with warm caches and fast incremental execution.
Like any other task, you can use ./mill clean to wipe out any cached in-memory or
on-disk state belonging to workers. This may be necessary if your worker implementation
has bugs that cause the worker disk or in-memory data structures to get into a bad state.
Autoclosable Workers
As Workers may also hold limited resources, it may be necessary to free up these
resources once a worker is no longer needed. For example, java.net.URLClassLoaders
need the .close() method to be called, third-party subprocesses spawned by os.spawn
need to .destroy() to be called, otherwise these externally-managed resources may leak
causes your build or system to run out of memory.
To implement resource cleanup, your worker can implement java.lang.AutoCloseable.
Once the worker is no longer needed, Mill will call the close() method on it before any newer version of this worker is created.
import mill._
import java.lang.AutoCloseable
class MyWorker() extends AutoCloseable {
// ...
override def close() = { /* cleanup and free resources */ }
}
def myWorker = Task.Worker { new MyWorker() }
CachedFactory Workers
One very common use case for workers is managing long-lived mutable state. The issue with long-lived mutable state is that in the presence of parallelism (the default in Mill), managing such state can be tricky:
-
If you allow unrestricted access to the mutable state across multiple threads, you are subject to race conditions and non-deterministic bugs
-
If you just synchronize/lock all access to the mutable state, you lose all benefits for parallelism
-
If you re-generate the mutable state each time for each thread, you lose the benefits of it being long lived
The solution to these issues is to maintain an in-memory cache, with proper locking,
eviction, and invalidation. Doing so is tedious and error prone, and so Mill provides
the CachedFactory helper to make it easier. The example below re-implements the a simplified
version of CompressWorker we saw earlier, but using CacheFactory to cache, reset, and
re-use the ByteArrayOutputStream btween calls to compressWorker2().compress:
import mill._
import java.lang.AutoCloseable
import mill.api.CachedFactory
import java.io.ByteArrayOutputStream
def cachedCompressWorker = Task.Worker { new CachedCompressWorker(Task.dest) }
class CachedCompressWorker(dest: os.Path) extends CachedFactory[Unit, ByteArrayOutputStream] {
def setup(key: Unit): ByteArrayOutputStream = {
println("setup ByteArrayOutputStream")
new ByteArrayOutputStream()
}
def teardown(key: Unit, value: ByteArrayOutputStream): Unit = {
println("teardown ByteArrayOutputStream")
value.reset()
}
def maxCacheSize = 2
val cache = collection.mutable.Map.empty[Int, Array[Byte]]
def compress(name: String, bytes: Array[Byte]): Array[Byte] = {
val hash = Arrays.hashCode(bytes)
if (!synchronized(cache.contains(hash))) {
println("Compressing: " + name)
val compressed = withValue(()) { bos => compressBytes2(bos, bytes) }
synchronized { cache(hash) = compressed }
} else {
println("Cached from memory: " + name)
}
synchronized { cache(hash) }
}
}
def compressBytes2(bos: ByteArrayOutputStream, input: Array[Byte]) = {
bos.reset()
Thread.sleep(1000) // Simulate a slow operation
val gzip = new GZIPOutputStream(bos)
gzip.write(input)
gzip.close()
bos.toByteArray
}
-
CachedFactorytakes two type parameters, aKkey type and aVvalue type. In this caseKisUnitsince allByteArrayOutputStreams are the same, but if you are caching things which take some kind of configuration (e.g. compilers with compiler flags) you can setKto be the config class so that values with different input configuration are cached separately. -
def setupcreates theByteArrayOutputStreamwhen a new one is necessary -
def maxCacheSizeconfigures the maximum number of cached entries to keep around while they are not in use, -
def teardowncleans up theByteArrayOutputStreams and when the count of cached entries exceeds thatmaxCacheSize`
Although this example is synthetic (you don’t actually need to reset, reuse, and teardown
ByteArrayOutputStreams) the same techniques would apply to any long-lived mutable state
or components you need to manage. This is especially important when
running JVM code in classloaders or subprocesses,
as those are both expensive to initialize and need to be properly closed or terminated
when you are done with them
The above cachedCompressWorker can be used as shown below, with three
def compressed* tasks using it to call .compress:
def compressed1 = Task {
cachedCompressWorker().compress("foo.txt", "I am cow".getBytes)
}
def compressed2 = Task {
cachedCompressWorker().compress("bar.txt", "Hear me moo".getBytes)
}
def compressed3 = Task {
cachedCompressWorker().compress("qux.txt", "I weigh twice as much as you".getBytes)
}
> # 3 streams are created on demand, 1 is torn down afer due to maxCacheSize = 2 limit
> ./mill show '{compressed1,compressed2,compressed3}'
setup ByteArrayOutputStream
setup ByteArrayOutputStream
setup ByteArrayOutputStream
Compressing: foo.txt
Compressing: bar.txt
Compressing: qux.txt
teardown ByteArrayOutputStream
{
"compressed1": ...
"compressed2": ...
"compressed3": ...
}
> # `clean` clears the CachedFactory and tears down the two streams in the cache
> ./mill clean cachedCompressWorker
teardown ByteArrayOutputStream
teardown ByteArrayOutputStream
Anonymous Tasks
You can define anonymous tasks using the Task.Anon {…} syntax. These are
not runnable from the command-line, but can be used to share common code you
find yourself repeating in Tasks and Commands.
package build
import mill._, define.Task
def data = Task.Source("data")
def anonTask(fileName: String): Task[String] = Task.Anon {
os.read(data().path / fileName)
}
def helloFileData = Task { anonTask("hello.txt")() }
def printFileData(fileName: String) = Task.Command {
println(anonTask(fileName)())
}
Anonymous task’s output does not need to be JSON-serializable, their output is not cached, and they can be defined with or without arguments. Unlike Tasks or Commands, anonymous tasks can be defined anywhere and passed around any way you want, until you finally make use of them within a downstream task or command.
While an anonymous task foo's own output is not cached, if it is used in a
downstream task baz and the upstream task bar hasn’t changed,
baz's cached output will be used and foo's evaluation will be skipped
altogether.
> ./mill show helloFileData
"Hello"
> ./mill printFileData --file-name hello.txt
Hello
> ./mill printFileData --file-name world.txt
World!
(Experimental) Forking Concurrent Futures within Tasks
Mill provides the Task.fork.async and Task.fork.await APIs for spawning async
futures within a task and aggregating their results later. This API is used by Mill
to support parallelizing test classes,
but can be used in your own tasks as well:
package build
import mill._
def taskSpawningFutures = Task {
val f1 = Task.fork.async(dest = Task.dest / "future-1", key = "1", message = "First Future") {
logger =>
println("Running First Future inside " + os.pwd)
Thread.sleep(3000)
val res = 1
println("Finished First Future")
res
}
val f2 = Task.fork.async(dest = Task.dest / "future-2", key = "2", message = "Second Future") {
logger =>
println("Running Second Future inside " + os.pwd)
Thread.sleep(3000)
val res = 2
println("Finished Second Future")
res
}
Task.fork.await(f1) + Task.fork.await(f2)
}
> ./mill show taskSpawningFutures
[1] Running First Future inside .../out/taskSpawningFutures.dest/future-1
[2] Running Second Future inside .../out/taskSpawningFutures.dest/future-2
[1] Finished First Future
[2] Finished Second Future
3
Task.fork.async takes several parameters in addition to the code block to be run:
-
destis a folder for which the async future is to be run, overridingos.pwdfor the duration of the future -
keyis a short prefix prepended to log lines to let you easily identify the future’s log lines and distinguish them from logs of other futures and tasks running concurrently -
messageis a one-line description of what the future is doing -
priority: 0 means the same priority as other Mill tasks, negative values <0 mean increasingly high priority, positive values >0 mean increasingly low priority
Each block spawned by Task.fork.async is assigned a dedicated logger with its own
.log file and terminal UI integration
Futures spawned by Task.fork.async count towards Mill’s -j/--jobs concurrency limit
(which defaults to one-per-core), so you can freely use Task.fork.async without worrying
about spawning too many concurrent threads and causing CPU or memory contention. Task.fork
uses Java’s built in ForkJoinPool and ManagedBlocker infrastructure under the hood
to effectively manage the number of running threads.
While scala.concurrent and java.util.concurrent can also be used to spawn thread
pools and run async futures, Task.fork provides a way to do so that integrates with Mill’s
existing concurrency, sandboxing and logging systems. Thus you should always prefer to
run async futures on Task.fork whenever possible.