How to start Spark cluster in minutes

We are always in a hurry. Docker can help us run a lot of software without installing it. One need not waste any time in installation. Afterall, all we need to learn is the end result and not installations. So, why not let Docker take care for it.

What docker provides:

  1. A closed network subnet where the docker containers can talk to each other.
  2. A default gateway for the containers for outbound traffic.
  3. Installed images, you literally need to do nothing.

Prerequisites:

  1. Installed docker, docker-compose ( 1.9.0 or higher )
  2. Linux box. I guess this can also be done on Windows, but that area is unexplored.

Install Spark:

  1. Create a dir at any location. This dir name, will be used for your project.
  2. $ mkdir ScalaCluster ; cd ScalaCluster
  3. Copy the following content to docker-compose.yml
version: "2"

services:
  master:
    image: singularities/spark
    command: start-spark master
    hostname: master
    ports:
      - "6066:6066"
      - "7070:7070"
      - "8080:8080"
      - "50070:50070"
  worker:
    image: singularities/spark
    command: start-spark worker master
    environment:
      SPARK_WORKER_CORES: 1
      SPARK_WORKER_MEMORY: 2g
    links:
      - master
  1. Run the following command to start the container
    1. $ docker-compose up -d
  2. Your 2 node cluster is up and running.
    1. You should see something line this.
$ docker ps 
CONTAINER ID        IMAGE                 COMMAND                  CREATED             STATUS              PORTS                                                                                                                                                                                   NAMES
3f63c7601c93        singularities/spark   "start-spark worke..."   10 minutes ago      Up 10 minutes       6066/tcp, 7077/tcp, 8020/tcp, 8080-8081/tcp, 10020/tcp, 13562/tcp, 14000/tcp, 19888/tcp, 50070/tcp, 50470/tcp                                                                           spark_worker_1
843460507f96        singularities/spark   "start-spark master"     10 minutes ago      Up 10 minutes       0.0.0.0:6066->6066/tcp, 7077/tcp, 0.0.0.0:7070->7070/tcp, 8020/tcp, 8081/tcp, 10020/tcp, 13562/tcp, 14000/tcp, 0.0.0.0:8080->8080/tcp, 19888/tcp, 0.0.0.0:50070->50070/tcp, 50470/tcp   spark_master_1
  1. If you wish to expand your cluster to more number of nodes, you can also scale it out using the following command:
    1. $ docker-compose scale worker=2.
    2. You should be able to see some more docker containers as below.
$ docker ps 
CONTAINER ID        IMAGE                 COMMAND                  CREATED             STATUS              PORTS                                                                                                                                                                                   NAMES
31488cf276ee        singularities/spark   "start-spark worke..."   7 seconds ago       Up 5 seconds        6066/tcp, 7077/tcp, 8020/tcp, 8080-8081/tcp, 10020/tcp, 13562/tcp, 14000/tcp, 19888/tcp, 50070/tcp, 50470/tcp                                                                           spark_worker_2
3f63c7601c93        singularities/spark   "start-spark worke..."   10 minutes ago      Up 10 minutes       6066/tcp, 7077/tcp, 8020/tcp, 8080-8081/tcp, 10020/tcp, 13562/tcp, 14000/tcp, 19888/tcp, 50070/tcp, 50470/tcp                                                                           spark_worker_1
843460507f96        singularities/spark   "start-spark master"     10 minutes ago      Up 10 minutes 0.0.0.0:6066->6066/tcp, 7077/tcp, 0.0.0.0:7070->7070/tcp, 8020/tcp, 8081/tcp, 10020/tcp, 13562/tcp, 14000/tcp, 0.0.0.0:8080->8080/tcp, 19888/tcp, 0.0.0.0:50070->50070/tcp, 50470/tcp   spark_master_1

You 3 node cluster is up and running now.

How to connect to your cluster ( Scala ):

Spark-shell is the primary way to connect to your Spark cluster. The services are up and running, but they are inside the cluster. Hence, you will have to install the Spark-client on your local and connect using the local client. Once installed, you can connect to the docker spark cluster by providing the master connection info to spark-shell.

To connect to your master, you need to figure out the I.P on which the master container is running. To check the I.P, you can use docker inspect.

$ docker inspect 843460507f96 | grep IPAddress
 "SecondaryIPAddresses": null,
 "IPAddress": "",
 "IPAddress": "172.18.0.2",

Use this IP address to connect using the Spark Shell. You will get a spark-shell by installing a spark client wherever you wish to. I did it by extracting the tar-ball ( Spark ver. 2.1.0 )  and simply use the spark shell to connect to it.

wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz
tar xvzf spark-2.1.0-bin-hadoop2.7.tgz

Go to the recently extracted bin directory in from the tarball. And connect to Spark cluster it using the master I.P obtained above

$ cd spark-2.1.0-bin-hadoop2.7/bin
$ ./spark-shell --master spark://172.18.0.2:7077 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/02/28 19:57:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/28 19:57:07 WARN Utils: Your hostname, mean-machine resolves to a loopback address: 127.0.0.1; using 10.20.4.35 instead (on interface eth0)
17/02/28 19:57:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/02/28 19:57:28 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/02/28 19:57:28 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/02/28 19:57:30 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://10.20.4.35:4040
Spark context available as 'sc' (master = spark://172.18.0.2:7077, app id = app-20170228142708-0000).
Spark session available as 'spark'.
Welcome to
 ____ __
 / __/__ ___ _____/ /__
 _\ \/ _ \/ _ `/ __/ '_/
 /___/ .__/\_,_/_/ /_/\_\ version 2.1.0
 /_/
 
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

You are good to go ahead … I hope this helped.

Do share your experiences while using the steps.

Next steps would be using a Spark-client also inside a docker instance. But, that has some time to come. Untill, I can do ahead with externally installed Spark-client.

Hasta-La-Vista,

Abhay Dandekar

 

 

Advertisements

17 thoughts on “How to start Spark cluster in minutes

  1. I am encountering with following error, can you help me?

    17/05/06 10:48:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
    17/05/06 10:49:24 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
    17/05/06 10:49:24 WARN StandaloneSchedulerBackend: Application ID is not initialized yet.
    17/05/06 10:49:24 WARN StandaloneAppClient$ClientEndpoint: Drop UnregisterApplication(null) because has not yet connected to master
    17/05/06 10:49:24 ERROR SparkContext: Error initializing SparkContext.
    java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
    at org.apache.spark.SparkContext.(SparkContext.scala:524)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2313)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
    at org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)
    at $line3.$read$$iw$$iw.(:15)
    at $line3.$read$$iw.(:42)
    at $line3.$read.(:44)
    at $line3.$read$.(:48)
    at $line3.$read$.()
    at $line3.$eval$.$print$lzycompute(:7)
    at $line3.$eval$.$print(:6)
    at $line3.$eval.$print()
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
    at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
    at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
    at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
    at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
    at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV$sp(SparkILoop.scala:38)
    at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
    at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
    at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
    at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37)
    at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:105)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:920)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
    at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
    at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
    at org.apache.spark.repl.Main$.doMain(Main.scala:68)
    at org.apache.spark.repl.Main$.main(Main.scala:51)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
    at org.apache.spark.SparkContext.(SparkContext.scala:524)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2313)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
    at org.apache.spark.repl.Main$.createSparkSession(Main.scala:95)
    … 47 elided
    :14: error: not found: value spark
    import spark.implicits._
    ^
    :14: error: not found: value spark
    import spark.sql
    ^

    Like

    1. Seems your worker node is unable to reach out to master.
      Do the following checks.
      1. Check out the reachability of your master node from worker. You must be providing the master URL while starting the worker node.
      2. Check the logs to see whenever the worker connects, it should be reflected in the master as well as worker logs.

      Hope that will resolve your issue. 🙂

      Like

  2. Hello

    Thank you for this explanation 🙂
    I followed your steps but I got this error 😦

    ./spark-shell: No such file or directory

    Can you help me please 😦

    Like

    1. Hi 🙂 I found that I didn’t execute this commande cd spark-2.1.0-bin-hadoop2.7/bin

      Now it works But I found this error :

      … 47 elided
      :14: error: not found: value spark
      import spark.implicits._
      ^
      :14: error: not found: value spark
      import spark.sql
      ^
      Can help me please 😦

      Like

      1. Hi, It seems the spark context did not instantiate due to some error. Hummm … Its a bit difficult to figure out what happened from the log your provided.
        Can you please pass on earlier logs here? Do check if something went wrong before this statement.

        Thanks,
        ~Abhay Dandekar

        Like

      2. Thank you for replying 🙂
        ./spark-shell –master spark://172.18.0.2:7077
        Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
        Setting default log level to “WARN”.
        To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
        17/07/11 18:58:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
        17/07/11 18:59:36 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
        17/07/11 18:59:36 WARN StandaloneSchedulerBackend: Application ID is not initialized yet.
        17/07/11 18:59:36 WARN StandaloneAppClient$ClientEndpoint: Drop UnregisterApplication(null) because has not yet connected to master
        17/07/11 18:59:36 ERROR SparkContext: Error initializing SparkContext.
        java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
        at scala.Predef$.require(Predef.scala:224)
        at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
        at org.apache.spark.SparkContext.(SparkContext.scala:524)
        at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2320)
        at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)
        at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
        at org.apache.spark.repl.Main$.createSparkSession(Main.scala:96)
        at $line3.$read$$iw$$iw.(:15)
        at $line3.$read$$iw.(:42)
        at $line3.$read.(:44)
        at $line3.$read$.(:48)
        at $line3.$read$.()
        at $line3.$eval$.$print$lzycompute(:7)
        at $line3.$eval$.$print(:6)
        at $line3.$eval.$print()
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
        at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
        at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
        at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
        at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
        at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
        at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
        at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV$sp(SparkILoop.scala:38)
        at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
        at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
        at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
        at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37)
        at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:105)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:920)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
        at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
        at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
        at org.apache.spark.repl.Main$.doMain(Main.scala:69)
        at org.apache.spark.repl.Main$.main(Main.scala:52)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
        java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
        at scala.Predef$.require(Predef.scala:224)
        at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
        at org.apache.spark.SparkContext.(SparkContext.scala:524)
        at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2320)
        at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)
        at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
        at org.apache.spark.repl.Main$.createSparkSession(Main.scala:96)
        … 47 elided
        :14: error: not found: value spark
        import spark.implicits._
        ^
        :14: error: not found: value spark
        import spark.sql
        ^
        Welcome to
        ____ __
        / __/__ ___ _____/ /__
        _\ \/ _ \/ _ `/ __/ ‘_/
        /___/ .__/\_,_/_/ /_/\_\ version 2.1.1
        /_/

        Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111)
        Type in expressions to have them evaluated.
        Type :help for more information.

        scala>

        Like

  3. Hi 🙂
    I solved this problem by correcting tow things in you configuration:
    1/ I replaced the port 7070 by 7077 in the docker-compose.yml
    2/ I replaced the ip adress in this (./spark-shell –master spark://172.18.0.2:7077) by localhost because it is a private ip adress.

    Like

  4. Thanks for the blog post!

    The spark shell in the master mode can be accessed like this:
    docker exec -it dockerspark_master_1 /bin/bash
    spark-shell

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s