SAGA Adaptor CPI v.1.0
bulk_strategy_try_exec.cpp
Go to the documentation of this file.
00001 //  Copyright (c) 2005 Stephan Hirmer (shirmer@cct.lsu.edu)
00002 // 
00003 //  Distributed under the Boost Software License, Version 1.0. (See accompanying 
00004 //  job LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
00005 
00006 #include <utility>
00007 
00008 #include <saga/saga/base.hpp>
00009 #include <saga/saga/util.hpp>
00010 #include <saga/saga/adaptors/task.hpp>
00011 #include <saga/impl/runtime.hpp>
00012 #include <saga/saga/adaptors/bulk_strategy_try_exec.hpp>
00013 
00014 #include <saga/impl/engine/task_container.hpp>
00015 
00016 namespace saga
00017 {
00018     namespace adaptors
00019     {
00020         bulk_strategy_try_exec::bulk_strategy_try_exec()
00021         {
00022         }
00023         
00024         saga::task bulk_strategy_try_exec::get_first_task(saga::task_container& tc)
00025         {
00026             std::vector<saga::task> bulk_task_list = tc.list_tasks();
00027             std::vector<saga::task>::iterator tmp_first_task = 
00028                                                  bulk_task_list.begin();
00029             
00030             return *tmp_first_task;
00031         }
00032         
00033         bool bulk_strategy_try_exec::prepare_args_for_adaptor(saga::task_container& tc, 
00034                                       TR1::shared_ptr<saga::impl::v1_0::cpi> adp)
00035         {
00036             std::vector<saga::task> bulk_task_list = tc.list_tasks();
00037             std::vector<saga::task>::iterator it_ = bulk_task_list.begin();
00038             
00039             while ( it_ != bulk_task_list.end () )
00040             {
00041                 if( (saga::task_base::New == it_->get_state())
00042                  || (saga::task_base::Running == it_->get_state())  )
00043                // Remark: the second condition is necessary because
00044                //         of the fact that tasks treated in a task_container
00045                //         return RUNNING as soon as task_container.run()
00046                //         is called. This is necessary to avoid race conditions.
00047                 {
00048                     try
00049                     {
00050                         impl::runtime::get_impl(*it_)->visit_args (adp.get ());
00051                     }
00052                     catch (std::exception e)
00053                     {
00054                         return false;
00055                     }
00056                 }
00057                 ++it_;
00058             }
00059             return true;
00060         }
00061         
00062         void bulk_strategy_try_exec::apply(sorted_tc_type & analyser_res)
00063         {
00064 //             this->thread_ = simple_future <int> (
00065 //                         TR1::bind (&bulk_strategy_try_exec::bond_apply, 
00066 //                                               this, TR1::ref(analyser_res)) );
00067 //             //this->complete_wait(); 
00068                this->bond_apply(analyser_res);                               
00069         }
00070         
00071         int bulk_strategy_try_exec::bond_apply(sorted_tc_type & analyser_res)
00072         {
00073             using namespace boost;
00074             using namespace saga::impl;
00075 
00076 //            std::cout<< "bulk_strategy_try_exec::apply size INPUT : "<< analyser_res.size() << std::endl;
00077             
00078             // iterate over each pair: string -- taskcontainer 
00079             // sorted_tc_type::iterator it = analyser_res.begin();
00080             sorted_tc_type::iterator it_end = analyser_res.end();
00081             for (sorted_tc_type::iterator it = analyser_res.begin(); 
00082                  it != it_end; ++it)
00083             {
00084                 // get the taskcontainer from out the pair.
00085                 saga::task_container tmp_tc = it->second;
00086                 
00087                 // just for testing
00088 //                 std::cout << "Try_exec: task container: " << it->first << std::endl;
00089 //                 std::cout << "Try_exec: ----------> Number in there: " << 
00090 //                               (unsigned int)(it->second.list_tasks() ).size()
00091 //                           << std::endl;
00092                 
00093                 saga::impl::v1_0::preference_type prefs;
00094                 
00095                 std::string cpi_name;
00096                 std::string op_name; 
00097                 this->default_process_func_name((it->first), cpi_name, op_name);
00098                 std::vector<saga::uuid> used_adaptors;
00099                 
00100                 size_t nb_tasks_todo = (tmp_tc.list_tasks()).size();
00101                 
00102                 saga::task first_task = get_first_task(tmp_tc);
00103                 TR1::shared_ptr <saga::impl::v1_0::cpi> my_bulk_adaptor;
00104                 
00105                 // as long as there is as least 1 task, which couldn't have been
00106                 // treated
00107                 while (nb_tasks_todo >= 1) 
00108                 {
00109                     if (nb_tasks_todo > 1)
00110                     {
00111                         // asking the first task, to search an appropriate adaptor for 
00112                         // an possible bulk execution 
00113                         my_bulk_adaptor.reset(
00114                             runtime::get_impl(first_task)->get_bulk_adaptor(
00115                                 cpi_name, op_name, prefs) );
00116                     }
00117 
00118                     bool adaptor_used_before = true;
00119 
00120                     // check if we found a bulk adaptor
00121                     if (my_bulk_adaptor)
00122                     {
00123                         std::vector<saga::uuid>::iterator res_find =
00124                                               std::find (used_adaptors.begin(), 
00125                                                          used_adaptors.end(), 
00126                                                          my_bulk_adaptor->get_adaptor_uuid() );
00127 
00128                         // check if the found bulk adaptor is a new one.
00129                         if ( res_find == used_adaptors.end() )
00130                         {
00131                             adaptor_used_before = false;
00132                             
00133 //                             std::cout<<"found bulk adaptor ... "<<std::endl;
00134 
00135                             my_bulk_adaptor->init_bulk();
00136                             
00137                             if (prepare_args_for_adaptor(tmp_tc, my_bulk_adaptor) )
00138                             {
00139                                 std::vector<saga::uuid> done_tasks = my_bulk_adaptor->execute_bulk();
00140 
00141                                 runtime::get_impl(tmp_tc)->set_state_by_uuid(done_tasks,
00142                                                     saga::task_base::Done);
00143                                                     
00144                                 nb_tasks_todo -= done_tasks.size();
00145                             }
00146                             used_adaptors.push_back(my_bulk_adaptor->get_adaptor_uuid() );
00147                         }
00148                     }
00149                     // if there is no suitable adaptor, we execute the tasks
00150                     // one by one, and switch to the next package.
00151                     if ( (!my_bulk_adaptor) || (adaptor_used_before)  )
00152                     {
00153 //                         std::cout<<"NO bulk adaptor found ... "<<std::endl;
00154 
00155                         // execute this task_container one by one!
00156                         runtime::get_impl(tmp_tc)->simple_run();
00157                         break;
00158                     }
00159                 } // end while
00160             } // end for
00161             
00162             return 1; // just for the future;
00163         }
00164     } // namespace adaptors
00165 }     // namespace saga
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines