%% BIODISTCOMPDEMO Batch processing through distributed computing 
% This demonstration illustrates how you can set a cluster of computers to
% preprocess a large set of mass spectrometry signals.

%   Copyright 2003-2004 The MathWorks, Inc.
%   $Revision: 1.1.10.1.2.1 $  $Date: 2005/01/19 21:54:14 $

if playbiodemo, return; end % Open in the editor or run step-by-step

%% Introduction
% This demonstration shows the required steps to set a cluster of computers
% to work in parallel to preprocess a large number of mass spectrograms.
% Batch processing adapts to a single program multiple data (SPMD) parallel
% computing model, and it is best suited for the *Distributed Computing
% Toolbox*. 

%%
% The signals to preprocess come from protein surface-enhanced laser
% desorption/ionization-time of flight (SELDI-TOF) mass spectra. The data
% in this example are from the FDA-NCI Clinical Proteomics Program Databank
% (http://home.ccr.cancer.gov/ncifdaproteomics/ ). In particular, the demo
% use the high-resolution ovarian cancer data set that was generated using
% the WCX2 protein array. 

%% Setting the repository for the data
% The demonstration assumes that you have already downloaded and
% uncompressed the datasets into your local repository. Ideally you should
% place the data set in a network drive such that all your worker machines
% can have access to it. This will minimize the data transfer through the
% job manager. 
 
%%
% First, get the name and full path to all the data sets.
 
repository = 'F:/MassSpecRepository/OvarianCD_PostQAQC/'; % <= change this to your repository
repositoryC = [repository 'Cancer/'];
repositoryN = [repository 'Normal/'];
 
filesCancer = dir([repositoryC '*.txt']);
NumberCancerDatasets = numel(filesCancer)
filesNormal = dir([repositoryN '*.txt']);
NumberNormalDatasets = numel(filesNormal)

%%
% Put all the filenames to process into a single variable.
files = [ strcat(repositoryC,{filesCancer.name}) ...
          strcat(repositoryN,{filesNormal.name})];
N = numel(files)   % total number of files      

%% Sequential batch processing
% Before launching the parallel processing engine, you need to test your
% algorithms locally with a for-loop. 

%%
% Write an m function with the set of instructions that need to be
% applied to every data set. The input argument is the filename and the
% output arguments are the preprocessed signal and its M/Z vector. For
% example: 
type msbatchprocessing

%%
% To run the batch processing function sequentially you only need to call
% it within a loop. For the demo purposes, we only preprocess two
% spectrograms and store them in the Y matrix.
 
Y = zeros(15000,2); % need to preset the size of Y for memory performance
for k = 1:2  % change to 1:N to do all
    [MZ,Y(:,k)] = msbatchprocessing(files{k});  
end
 
%% Distributed computing batch processing
% Find a job manager using *findResource*.
get(findResource('jobmanager'),'HostAddress')
get(findResource('jobmanager'),'Name')

%%
% Pick one of them (ask your system administrator which one you can use),
% or if none is present refer to the *Distributed Computing Toolbox*
% documentation to see how to start your own job manager. In this example
% we selected the 'BIOINFO_JM' job manager which contains eight machines in
% the cluster, all in idle state.
msmgr= findResource('jobmanager','Name','BIOINFO_jm');
get(msmgr)
get(msmgr.IdleWorkers,'Name')

%% 
% If you have written your own batch processing function, you should
% include it in the variable file_dep to make sure it is transmitted to the
% workers.
file_dep = 'msbatchprocessing.m'

%% 
% Additionally, if the latest version of the *Bioinformatics Toolbox* is
% not installed in the worker machines you can indicate the path to the
% required functions.

%fullPathToLocalBioinfoToolbox = fileparts(which('msresample.m'));
%file_dep = { 'msbatchprocessing.m', fullPathToLocalBioinfoToolbox};

%% 
% Create one job with one task for each spectrogram and submit the job to
% the manager.
msjob = createJob(msmgr,'FileDependencies',file_dep);
for k = 1:N
    mstask(k) = createTask(msjob,@msbatchprocessing,2,files(k));
end
submit(msjob) 

%%
% Once all jobs are submitted, loop again to collect the preprocessed
% spectrograms. You grab every data set as soon as its respective task is
% finished. 
N=2;
Y = zeros(15000,N); % need to preset the size of Y for memory performance
for k = 1:N 
    waitForState(mstask(k), 'finished')
    if k == 1 % MZ is the same for all spectrograms, we get it only once
        MZ = mstask(1).OutputArguments{1};
    end
    Y(:,k) = mstask(k).OutputArguments{2};
end

%%
% After collecting all the data you can use it locally. For example, you
% can apply group normalization and save it, because this preprocessed data
% is used in *cancerdetectdemo*. 

Y = msnorm(MZ,Y,'QUANTILE',0.5,'LIMITS',[3500 11000],'MAX',50);
save OvarianCancerQAQCdataset.mat Y MZ

