Deploy Applications to Spark Using the MATLAB API for Spark
Supported Platform: Linux® only.
This example shows you how to deploy a standalone application to Spark™ using the MATLAB® API for Spark. Your application can be deployed against Spark using one of two supported cluster managers: local and Hadoop® YARN. This example shows you how to deploy your application using both cluster managers. For a discussion on cluster managers, see Cluster Managers Supported by Spark.
Goal: Count the number of unique airlines in the given dataset.
Dataset: | airlinesmall.csv |
Description: |
Airline departure and arrival information from 1987-2008. |
Location: | /usr/local/MATLAB/R2024b/toolbox/matlab/demos |
Helper Function
Create a MATLAB file named carrierToCount.m
with the following
code:
function results = carrierToCount(input) tbl = input{1}; intermKeys = tbl.UniqueCarrier; [intermKeys, ~, idx] = unique(intermKeys); intermValues = num2cell(accumarray(idx, ones(size(idx)))); results = cellfun( @(x,y) {x,y} , ... intermKeys, intermValues, ... 'UniformOutput',false);
Note
If you are using Spark version 1.6 or higher, you will need to increase the Java® heap size in MATLAB to at least 512MB. For information on how to increase the Java heap size in MATLAB, see Java Heap Memory Preferences.
Local
A local cluster manager represents a pseudo Spark enabled
cluster and works in a non-distributed mode on a single machine. It
can be configured to use one worker thread, or on a multi-core machine,
multiple worker threads. In applications, it is denoted by the word local
.
A local cluster manager is handy for debugging your application prior
to full blown deployment on a Spark enabled Hadoop cluster.
Prerequisites
Start this example by creating a new work folder that is visible to the MATLAB search path.
Create the helper function
carrierToCount.m
mentioned above.
Procedure
Specify Spark properties.
Use a
containers.Map
object to specify Spark properties.sparkProp = containers.Map(... {'spark.executor.cores',... 'spark.matlab.worker.debug'},... {'1',... 'true'});
Spark properties indicate the Spark execution environment of the application that is being deployed. Every application must be configured with specific Spark properties in order for it to be deployed.
For more information on Spark properties, expand the
prop
value of the'SparkProperties'
name-value pair in the Input Arguments section of theSparkConf
class.Create a
SparkConf
object.Use the class
matlab.compiler.mlspark.SparkConf
to create aSparkConf
object. ASparkConf
object stores the configuration parameters of the application being deployed to Spark. The configuration parameters of an application are passed onto a Spark cluster through a SparkContext.conf = matlab.compiler.mlspark.SparkConf(... 'AppName', 'mySparkAppDepLocal', ... 'Master', 'local[1]', ... 'SparkProperties', sparkProp );
For more information on SparkConf, see
matlab.compiler.mlspark.SparkConf
.Create a
SparkContext
object.Use the class
matlab.compiler.mlspark.SparkContext
with theSparkConf
object as an input to create aSparkContext
object.sc = matlab.compiler.mlspark.SparkContext(conf);
A
SparkContext
object serves as an entry point to Spark by initializing a connection to a Spark cluster. It accepts aSparkConf
object as an input argument and uses the parameters specified in that object to set up the internal services necessary to establish a connection to the Spark execution environment.For more information on SparkContext, see
matlab.compiler.mlspark.SparkContext
.Create an
RDD
object from the data.Use the MATLAB function
datastore
to create adatastore
object pointing to the fileairlinesmall.csv
. Then use the SparkContext methoddatastoreToRDD
to convert thedatastore
object to a SparkRDD
object.% Create a MATLAB datastore (LOCAL) ds = datastore('airlinesmall.csv',... 'TreatAsMissing','NA', ... 'SelectedVariableNames','UniqueCarrier'); % Convert MATLAB datastore to Spark RDD rdd = sc.datastoreToRDD(ds);
In general, input RDDs can be created using the following methods of the
SparkContext
class:parallelize
,datastoreToRDD
, andtextFile
.Perform operations on the
RDD
object.Use a Spark RDD method such as
flatMap
to apply a function to all elements of theRDD
object and flatten the results. The functioncarrierToCount
that was created earlier serves as the function that is going to be applied to the elements of the RDD. A function handle to the functioncarrierToCount
is passed as an input argument to theflatMap
method.maprdd = rdd.flatMap(@carrierToCount); redrdd = maprdd.reduceByKey( @(acc,value) acc+value ); countdata = redrdd.collect(); % Count and display carrier occurrences count = 0; for i=1:numel(countdata) count = count + countdata{i}{2}; fprintf('\nCarrier Name: %s, Count: %d', countdata{i}{1}, countdata{i}{2}); end fprintf('\n Total count : %d\n', count); % Delete Spark Context delete(sc)
In general, you will provide MATLAB functions handles or anonymous functions as input arguments to Spark RDD methods known as transformations and actions. These function handles and anonymous functions are executed on the workers of the deployed application.
For a list of supported RDD transformations and actions, see Transformations and Actions in the Methods section of the
RDD
class.For more information on transformations and actions, see Apache Spark Basics.
Create a standalone application.
Use the
mcc
command with the-m
flag to create a standalone application. The-m
flag creates a standard executable that can be run from a command line. The-a
flag includes the dependent datasetairlinesmall.csv
from the folder<matlabroot>/toolbox/matlab/demos
. Themcc
command automatically picks up the dependent filecarrierToCount.m
as long as it is in the same work folder.>> mcc -m deployToSparkMlApiLocal.m -a <matlabroot>/toolbox/matlab/demos/airlinesmall.csv
The
mcc
command creates a shell scriptrun_deployToSparkMlApiLocal.sh
to run the executable filedeployToSparkMlApiLocal
.For more information, see
mcc
.Run the standalone application from a Linux shell using the following command:
$ ./run_deployToSparkMlApiLocal.sh /share/MATLAB/MATLAB_Runtime/v91
/share/MATLAB/MATLAB_Runtime/v91
is an argument indicating the location of the MATLAB Runtime.Prior to executing the above command, make sure the
javaclasspath.txt
file is in the same folder as the shell script and the executable.Your application will fail to execute if it cannot find the file
javaclasspath.txt
.Your application may also fail to execute if the optional line containing the folder location of the Hadoop configuration files is uncommented. To execute your application on the
local
cluster manager, this line must be commented. This line should only be uncommented if you plan on running your application usingyarn-client
as your cluster manager on a Spark enabled Hadoop cluster.You will see the following output:
Carrier Name: 9E, Count: 521 Carrier Name: AA, Count: 14930 Carrier Name: AQ, Count: 154 Carrier Name: AS, Count: 2910 Carrier Name: B6, Count: 806 Carrier Name: CO, Count: 8138 ... ... ... Carrier Name: US, Count: 13997 Carrier Name: WN, Count: 15931 Carrier Name: XE, Count: 2357 Carrier Name: YV, Count: 849 Total count : 123523
Code:
Hadoop YARN
A yarn-client cluster manager represents a Spark enabled Hadoop cluster.
A YARN cluster manager was introduced in Hadoop 2.0. It is typically
installed on the same nodes as HDFS™. Therefore, running Spark on
YARN lets Spark access HDFS data easily. In applications,
it is denoted using the word yarn-client
.
Since the steps for deploying your application using yarn-client
as
your cluster manager are similar to using the local cluster manager
shown above, the steps are presented with minimal discussion. For
a detailed discussion of each step, check the Local case
above.
Note
You can follow the same instructions to deploy Spark applications created using the MATLAB API for Spark to CLOUDERA® CDH. To see an example on MATLAB Answers™, click here.
To use CLOUDERA CDH encryption zones, add the JAR file
commons-codec-1.9.jar
to the static classpath of
MATLAB Runtime
. Location of the file:
$HADOOP_PREFIX/lib/commons-codec-1.9.jar
, where
$HADOOP_PREFIX is the location where Hadoop is installed.
Prerequisites
Start this example by creating a new work folder that is visible to the MATLAB search path.
Install the MATLAB Runtime in a folder that is accessible by every worker node in the Hadoop cluster. This example uses
/share/MATLAB/MATLAB_Runtime/v91
as the location of the MATLAB Runtime folder.If you don’t have the MATLAB Runtime, you can download it from the website at:
https://www.mathworks.com/products/compiler/mcr
.Copy the
airlinesmall.csv
into Hadoop Distributed File System (HDFS) folder/user/<username>/datasets
. Here<username>
refers to your username in HDFS.$ ./hadoop fs -copyFromLocal airlinesmall.csv hdfs://host:54310/user/<username>/datasets
Procedure
Set up the environment variable,
HADOOP_PREFIX
to point at your Hadoop install folder. These properties are necessary for submitting jobs to your Hadoop cluster.setenv('HADOOP_PREFIX','/share/hadoop/hadoop-2.6.0')
The
HADOOP_PREFIX
environment variable must be set when using the MATLABdatastore
function to point to data on HDFS. Setting this environment variable has nothing to do with Spark. See Relationship Between Spark and Hadoop for more information.Specify Spark properties.
Use a
containers.Map
object to specify Spark properties.sparkProperties = containers.Map( ... {'spark.executor.cores',... 'spark.executor.memory',... 'spark.yarn.executor.memoryOverhead',... 'spark.dynamicAllocation.enabled',... 'spark.shuffle.service.enabled',... 'spark.eventLog.enabled',... 'spark.eventLog.dir'}, ... {'1',... '2g',... '1024',... 'true',... 'true',... 'true',... 'hdfs://hadoop01glnxa64:54310/user/<username>/sparkdeploy'});
For more information on Spark properties, expand the
prop
value of the'SparkProperties'
name-value pair in the Input Arguments section of theSparkConf
class.Create a
SparkConf
object.Use the class
matlab.compiler.mlspark.SparkConf
to create aSparkConf
object.conf = matlab.compiler.mlspark.SparkConf( ... 'AppName','myApp', ... 'Master','yarn-client', ... 'SparkProperties',sparkProperties);
For more information on SparkConf, see
matlab.compiler.mlspark.SparkConf
.Create a
SparkContext
object.Use the class
matlab.compiler.mlspark.SparkContext
with theSparkConf
object as an input to create aSparkContext
object.sc = matlab.compiler.mlspark.SparkContext(conf);
For more information on SparkContext, see
matlab.compiler.mlspark.SparkContext
.Create an
RDD
object from the data.Use the MATLAB function
datastore
to create adatastore
object pointing to the fileairlinesmall.csv
in HDFS. Then use the SparkContext methoddatastoreToRDD
to convert thedatastore
object to a SparkRDD
object.% Create a MATLAB datastore (HADOOP) ds = datastore(... 'hdfs:///user/<username>/datasets/airlinesmall.csv',... 'TreatAsMissing','NA',... 'SelectedVariableNames','UniqueCarrier'); % Convert MATLAB datastore to Spark RDD rdd = sc.datastoreToRDD(ds);
In general, input RDDs can be created using the following methods of the
SparkContext
class:parallelize
,datastoreToRDD
, andtextFile
.Perform operations on the
RDD
object.Use a Spark RDD method such as
flatMap
to apply a function to all elements of theRDD
object and flatten the results. The functioncarrierToCount
that was created earlier serves as the function that is going to be applied to the elements of the RDD. A function handle to the functioncarrierToCount
is passed as an input argument to theflatMap
method.maprdd = rdd.flatMap(@carrierToCount); redrdd = maprdd.reduceByKey( @(acc,value) acc+value ); countdata = redrdd.collect(); % Count and display carrier occurrences count = 0; for i=1:numel(countdata) count = count + countdata{i}{2}; fprintf('\nCarrier Code: %s, Count: %d', countdata{i}{1}, countdata{i}{2}); end fprintf('\n Total count : %d\n', count); % Save results to MAT file save('countdata.mat','countdata'); % Delete Spark Context delete(sc);
For a list of supported RDD transformations and actions, see Transformations and Actions in the Methods section of the
RDD
class.For more information on transformations and actions, see Apache Spark Basics.
Create a standalone application.
Use the
mcc
command with the-m
flag to create a standalone application. The-m
flag creates a standalone application that can be run from a command line. You do not need to attach the datasetairlinesmall.csv
since it resides on HDFS. Themcc
command automatically picks up the dependent filecarrierToCount.m
as long as it is in the same work folder.>> mcc -m deployToSparkMlApiHadoop.m
The
mcc
command creates a shell scriptrun_deployToSparkMlApiHadoop.sh
to run the executable filedeployToSparkMlApiHadoop
.For more information, see
mcc
.Run the standalone application from a Linux shell using the following command:
$ ./run_deployToSparkMlApiHadoop.sh /share/MATLAB/MATLAB_Runtime/v91
/share/MATLAB/MATLAB_Runtime/v91
is an argument indicating the location of the MATLAB Runtime.Prior to executing the above command, make sure the
javaclasspath.txt
file is in the same folder as the shell script and the executable.Your application will fail to execute if it cannot find the file
javaclasspath.txt
.Your application may also fail to execute if the optional line containing the folder location of the Hadoop configuration files is commented. To execute your application on a yarn-client cluster manager, this line must be uncommented. This line should only be commented if you plan on running your application using a local cluster manager.
You will see the following output:
Carrier Name: 9E, Count: 521 Carrier Name: AA, Count: 14930 Carrier Name: AQ, Count: 154 Carrier Name: AS, Count: 2910 Carrier Name: B6, Count: 806 Carrier Name: CO, Count: 8138 ... ... ... Carrier Name: US, Count: 13997 Carrier Name: WN, Count: 15931 Carrier Name: XE, Count: 2357 Carrier Name: YV, Count: 849 Total count : 123523
Note
If the application being deployed is a MATLAB function as opposed to a MATLAB script, use the following execution syntax:
For example:$ ./run_<applicationName>.sh \ <MATLAB_Runtime_Location> \ [Spark arguments] \ [Application arguments]
$ ./run_deployToSparkMlApiHadoop.sh.sh \ /usr/local/MATLAB/MATLAB_Runtime/v91 \ yarn-client \ hdfs://host:54310/user/<username>/datasets/airlinesmall.csv \ hdfs://host:54310/user/<username>/result
Code: