Mapreduce does not seem to use all available cores

Hello,
I am using mapreduce on a machine with 16 cores. I make a pool with 15 workers (cores) which works fine. When I run mapreduce though, it only utilizes one or two workers: sometimes one for the mapper and one for the reducer. This is how I check which worker is processing the data (in addition to using a system monitor to watch CPU/core activities):
tk=getCurrentTask();
disp(tk.ID)
There are tens of files to be processed and each mapper is called with one file to process. Each time a mapper is called it loads and processes one file. I expect that during the first call to the mapper and while it is loading and processing the first file on one worker (core), there are other parallel calls to mapper to process the next files on other workers. However, this is not how it happens; it just sequentially calls the mapper on the same worker. Sometimes it uses a second worker for the reducer calls. So at most it uses two workers, while there are 15 available in the pool.
What would be a simple code to check if mapreduce is making use of all the available cores?
EDIT: Actually now I can confirm that the mapper is always run by a single worker, but the reducer may be run by a few different workers, as expected.
Your help is appreciated, Mehrdad

10 Comments

The steps you provide should be sufficient for checking how many cores are being used. From what you say, the ID was the same for every map call. This is not correct, it should give each of 1 to 15 at least once when you have enough data (and having over 15 files in the input is one of several ways of ensuring this).
How big are the input files and how roughly how long does it take to complete the mapreduce call?
Thank you, Rick! I have tens of files to be processed, and each are about 40 MB in size. The entire task can take a few hours to complete.
Actually now I can confirm that the mapper is always run by a single worker, but the reducer may be run by a few different workers, as expected.
It's interesting that my mapper needs to do most of the work, and my reducer is just doing some simple bookkeeping.
Several further questions, does the function handle for the mapper have attached data? For example, is it constructed with:
mapFunction = @(data, info, output) myFunction(data, info, output, someOtherConstantData);
If so, how large is this attached data? One way to check how much data is attached to a handle is to do the following code:
f = functions(mapFunction);
whos f
f.workspace{:}
If this last line errors, there is no attached data. Otherwise, the data in each of the fields shown in the output will be attached to the function handle. The output of 'whos' will give a rough estimate of size.
Also for clarification, when you say tens of files, I assume you mean a number in the range 10 to 100? Or is the number of files far larger, for example 1000+ files or 10000+ files?
Actually I have now come up with a simple example code to illustrate this problem (changing the example presented in Getting Started with MapReduce). Running the following code (also attached) on my system shows that there is only one worker for the mapper function. Note the single value 9 for the key 'MapperTaskID' in the output.
Output:
Key Value
_______________ ________
'ReducerTaskID' [ 9]
'Mean' [702.16]
'ReducerTaskID' [ 7]
'MapperTaskID' [ 9]
'MapperTaskID' [ 9]
'MapperTaskID' [ 9]
'MapperTaskID' [ 9]
'MapperTaskID' [ 9]
'MapperTaskID' [ 9]
...
The testing code:
function keyvalues = workers_test
ds = datastore('airlinesmall.csv','TreatAsMissing','NA');
ds.SelectedVariableNames = 'Distance';
ds.RowsPerRead = 5000; % smaller values increase the num of mapper calls
preview(ds)
outds = mapreduce(ds, @MeanDistMapFun, @MeanDistReduceFun);
keyvalues = readall(outds);
end
function MeanDistMapFun(data, info, intermKVStore)
tk=getCurrentTask();
add(intermKVStore, 'MapperTaskID', tk.ID);
distances = data.Distance(~isnan(data.Distance));
sumLenValue = [sum(distances) length(distances)];
add(intermKVStore, 'sumAndLength', sumLenValue);
end
function MeanDistReduceFun(intermKey, intermValIter, outKVStore)
tk=getCurrentTask();
add(outKVStore, 'ReducerTaskID', tk.ID);
if strcmp(intermKey, 'MapperTaskID')
while hasnext(intermValIter) % pass the same key/values along
add(outKVStore, intermKey, getnext(intermValIter));
end
return
end
sumLen = [0 0];
while hasnext(intermValIter)
sumLen = sumLen + getnext(intermValIter);
end
add(outKVStore, 'Mean', sumLen(1)/sumLen(2));
end
This example hits a separate limitation that the input data currently needs to "large" to provide meaningful parallelism. As of R2014b, "large" means either single files larger than 32 MB or a collection of files. If you change workers_test to the following, you should see many ID's in the output:
function keyvalues = workers_test
files = repmat({'airlinesmall.csv'}, 1, 15);
ds = datastore(files,'TreatAsMissing','NA');
ds.SelectedVariableNames = 'Distance';
ds.RowsPerRead = 5000; % smaller values increase the num of mapper calls
preview(ds)
outds = mapreduce(ds, @MeanDistMapFun, @MeanDistReduceFun);
keyvalues = readall(outds);
end
> This example hits a separate limitation that the input data currently needs to "large" to provide meaningful parallelism.
I guess this limitation is behind the problem I am having. I have about 600 files to be processed. The files are about 40M on average (ranging from 5M to 130M max). All of them are in .mat format containing exactly four structs, which contain the data, meta data, etc. So the actual "data" table in each file is inside a struct in that file. I wasn't sure if it is possible to directly make datastores from these tables that are inside structs in the files. So instead I pass to the datastore as input a text file containing the 600 .mat filenames. (And set ds.RowsPerRead=1 to go through the filenames one by one.)
Then as I mentioned in the original post "each time a mapper is called it loads and processes one file."
Given the limitation you are mentioning, since the input to the mapper is just a filename, it will not provide parallelism.
  • Is there any setting options to change this assumption that small input requires small amount of processing?
  • Or is there any way to make a datastore of tables that are inside structs in the input files?
It is not possible as of R2014b to remove this assumption, or to create a datastore from mat files containing structs (without modifying the files). Both of these items are on our radar.
The workaround here is to split the filenames across several text files. For example, if you create 15 text files each listing 60 mat filenames, a datastore of this should result in parallelism at the map stage.
Thank you for the workaround, I will try it.
I would also appreciate it if you could let me know how the .mat files should be modified to be suitable for datastores.
  • The .mat file should only have a single table and no other variables,
  • or just the table should be not be inside a struct,
  • or ...?
Currently, the one very specific form of mat files that can be read by datastore is the output of another mapreduce call. An unofficial shortcut that creates such a mat file is the following code:-
data.Key = {'Test'};
data.Value = {struct('a', 'Hello World!', 'b', 42)};
save('myMatFile.mat', '-struct', 'data');
ds = datastore('myMatFile.mat');
readall(ds)
Thank you Rick! I found your reply here useful. So I thought it's good to have a separate thread for this tip.

Sign in to comment.

 Accepted Answer

In R2014b, there are some limitations with the minimum size of data that can be parallelized. To avoid this limitation, the input datastore must contain at least one of the following:
  1. Multiple files, where each file will be handled in parallel.
  2. Files that are larger than 32 MB, where each 32 MB will be handled in parallel.
If the input datastore contains a single small file, you will need to find a way to split that file into multiple files. For example, if the input datastore contains a single file listing many filenames (to the actual data), you can split this up into many files each containing a single or small number of filenames to ensure parallelism.

More Answers (0)

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!