Spark Examples

This page will discuss running spark in scala and python.

Hello Spark

build.mill (download, browse)
package build
import mill._, scalalib._

object foo extends ScalaModule {
  def scalaVersion = "2.12.15"
  def ivyDeps = Seq(
    ivy"org.apache.spark::spark-core:3.5.4",
    ivy"org.apache.spark::spark-sql:3.5.4"
  )

  def forkArgs = Seq("--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED")

  object test extends ScalaTests {
    def ivyDeps = Seq(ivy"com.lihaoyi::utest:0.8.5")
    def testFramework = "utest.runner.Framework"

    def forkArgs = Seq("--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED")
  }

}

This examples demonstrates running spark using mill.

> ./mill foo.run
...
+-------------+
|      message|
+-------------+
|Hello, World!|
+-------------+
...

> ./mill foo.test
...
+ foo.FooTests.helloWorld should create a DataFrame with one row containing 'Hello, World!'...
...

Hello Pyspark

build.mill (download, browse)
package build
import mill._, scalalib._

object foo extends ScalaModule {
  def scalaVersion = "2.12.15"
  def ivyDeps = Seq(
    ivy"org.apache.spark::spark-core:3.5.4",
    ivy"org.apache.spark::spark-sql:3.5.4"
  )

  def forkArgs = Seq("--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED")

  object test extends ScalaTests {
    def ivyDeps = Seq(ivy"com.lihaoyi::utest:0.8.5")
    def testFramework = "utest.runner.Framework"

    def forkArgs = Seq("--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED")
  }

}

This examples demonstrates running spark using mill.

> ./mill foo.run
...
+-------------+
|      message|
+-------------+
|Hello, World!|
+-------------+
...

> ./mill foo.test
...
+ foo.FooTests.helloWorld should create a DataFrame with one row containing 'Hello, World!'...
...

Semi realistic spark project with spark submit

build.mill (download, browse)
package build
import mill._, scalalib._

object `package` extends RootModule with ScalaModule {
  def scalaVersion = "2.12.15"
  def ivyDeps = Seq(
    ivy"org.apache.spark::spark-core:3.5.4",
    ivy"org.apache.spark::spark-sql:3.5.4"
  )

  def forkArgs = Seq("--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED")

  def prependShellScript = ""

  object test extends ScalaTests {
    def ivyDeps = Seq(ivy"com.lihaoyi::utest:0.8.5")
    def testFramework = "utest.runner.Framework"

    def forkArgs = Seq("--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED")
  }

}

This examples demonstrates a semi realistic example calculating summary statics from a transactions.csv passed in as argument, defaulting to resources if not present.

> ./mill run
...
Summary Statistics by Category:
+-----------+------------+--------------+-----------------+
|   category|total_amount|average_amount|transaction_count|
+-----------+------------+--------------+-----------------+
|       Food|        70.5|          23.5|                3|
|Electronics|       375.0|         187.5|                2|
|   Clothing|       120.5|         60.25|                2|
+-----------+------------+--------------+-----------------+
...

> ./mill test
...
+ foo.FooTests.computeSummary should compute correct summary statistics...
...

> chmod +x spark-submit.sh

> ./mill show assembly # prepare for spark-submit
".../out/assembly.dest/out.jar"

> ./spark-submit.sh out/assembly.dest/out.jar foo.Foo resources/transactions.csv
...
Summary Statistics by Category:
+-----------+------------+--------------+-----------------+
|   category|total_amount|average_amount|transaction_count|
+-----------+------------+--------------+-----------------+
|       Food|        70.5|          23.5|                3|
|Electronics|       375.0|         187.5|                2|
|   Clothing|       120.5|         60.25|                2|
+-----------+------------+--------------+-----------------+
...