This demonstration illustrates how you can set a cluster of computers to preprocess a large set of mass spectrometry signals.
This demonstration shows the required steps to set a cluster of computers to work in parallel to preprocess a large number of mass spectrograms. Batch processing adapts to a single program multiple data (SPMD) parallel computing model, and it is best suited for the Distributed Computing Toolbox.
The signals to preprocess come from protein surface-enhanced laser desorption/ionization-time of flight (SELDI-TOF) mass spectra. The data in this example are from the FDA-NCI Clinical Proteomics Program Databank (http://home.ccr.cancer.gov/ncifdaproteomics/ ). In particular, the demo use the high-resolution ovarian cancer data set that was generated using the WCX2 protein array.
The demonstration assumes that you have already downloaded and uncompressed the datasets into your local repository. Ideally you should place the data set in a network drive such that all your worker machines can have access to it. This will minimize the data transfer through the job manager.
First, get the name and full path to all the data sets.
repository = 'F:/MassSpecRepository/OvarianCD_PostQAQC/'; % <= change this to your repository repositoryC = [repository 'Cancer/']; repositoryN = [repository 'Normal/']; filesCancer = dir([repositoryC '*.txt']); NumberCancerDatasets = numel(filesCancer) filesNormal = dir([repositoryN '*.txt']); NumberNormalDatasets = numel(filesNormal)
NumberCancerDatasets =
120
NumberNormalDatasets =
93
Put all the filenames to process into a single variable.
files = [ strcat(repositoryC,{filesCancer.name})...
strcat(repositoryN,{filesNormal.name})];
N = numel(files) % total number of files
N = 213
Before launching the parallel processing engine, you need to test your algorithms locally with a for-loop.
Write an m function with the set of instructions that need to be applied to every data set. The input argument is the filename and the output arguments are the preprocessed signal and its M/Z vector. For example:
type msbatchprocessing
function [MZ,Y] = msbatchprocessing(filename) % function [MZ,Y] = msbatchprocessing(FILENAME) % % Desmonstration function for batch processing in BIODISTCOMPDEMO, % parameters in the preprocessing steps have been adjusted to deal with % high-resolution spectrograms. % read the two-column text file with mass-charge and intensity values D = textread(filename); % resample the signal to 15000 points between 710 and 11900 [MZ,YR]=msresample(D(:,1),D(:,2),15000,'RANGE',[710,11900]); % align the spectrograms to two good reference peaks P = [3883.766 7766.166]; YA = msalign(MZ,YR,P,'WIDTH',2); % estimate and adjust the background YB = msbackadj(MZ,YA,'STEP',50,'WINDOW',50); % reduce the noise using a nonparametric filter Y = mslowess(MZ,YB,'SPAN',5);
To run the batch processing function sequentially you only need to call it within a loop. For the demo purposes, we only preprocess two spectrograms and store them in the Y matrix.
Y = zeros(15000,2); % need to preset the size of Y for memory performance for k = 1:2 % change to 1:N to do all [MZ,Y(:,k)] = msbatchprocessing(files{k}); end
Find a job manager using findResource.
get(findResource('jobmanager'),'HostAddress') get(findResource('jobmanager'),'Name')
ans =
'144.212.XXX.XXX'
'144.212.XXX.XXX'
'144.212.XXX.XXX'
'144.212.XXX.XXX'
'144.212.XXX.XXX'
ans =
'STATS_JM'
'BIOINFO_JM'
'LAB1_jm'
'Test_Job_Manager'
'IMAGE_lab_JM'
Pick one of them (ask your system administrator which one you can use), or if none is present refer to the Distributed Computing Toolbox documentation to see how to start your own job manager. In this example we selected the 'BIOINFO_JM' job manager which contains eight machines in the cluster, all in idle state.
msmgr= findResource('jobmanager','Name','BIOINFO_JM'); get(msmgr) get(msmgr.IdleWorkers,'Name')
Name: 'BIOINFO_JM'
Hostname: 'bioHost'
HostAddress: '144.212.XXX.XXX'
Jobs: [1x1 distcomp.job]
State: 'running'
NumberOfBusyWorkers: 0
BusyWorkers: [0x1 double]
NumberOfIdleWorkers: 8
IdleWorkers: [8x1 distcomp.worker]
ans =
'ENZYME_WORKER'
'PROTEASES_WORKER'
'AMYLASES_WORKER'
'LIPASES_WORKER'
'CELLULASES_WORKER'
'DNA_WORKER'
'RNA_WORKER'
'PROTEIN_WORKER'
If you have written your own batch processing function, you should include it in the variable file_dep to make sure it is transmitted to the workers.
file_dep = 'msbatchprocessing.m'
Additionally, if the latest version of the Bioinformatics Toolbox is not installed in the worker machines you can indicate the path to the required functions.
%fullPathToLocalBioinfoToolbox = fileparts(which('msresample.m')); %file_dep = { 'msbatchprocessing.m', fullPathToLocalBioinfoToolbox};
Create one job with one task for each spectrogram and submit the job to the manager.
msjob = createJob(msmgr,'FileDependencies',file_dep); for k = 1:N mstask(k) = createTask(msjob,@msbatchprocessing,2,files(k)); end submit(msjob)
Once all jobs are submitted, loop again to collect the preprocessed spectrograms. You grab every data set as soon as its respective task is finished.
Y = zeros(15000,N); % need to preset the size of Y for memory performance for k = 1:N waitForState(mstask(k), 'finished') if k == 1 % MZ is the same for all spectrograms, we get it only once MZ = mstask(1).OutputArguments{1}; end Y(:,k) = mstask(k).OutputArguments{2}; end
After collecting all the data you can use it locally. For example, you can apply group normalization and save it, because this preprocessed data is used in cancerdetectdemo.
Y = msnorm(MZ,Y,'QUANTILE',0.5,'LIMITS',[3500 11000],'MAX',50); save OvarianCancerQAQCdataset.mat Y MZ