SAGA Adaptor CPI v.1.0
|
00001 // Copyright (c) 2005 Stephan Hirmer (shirmer@cct.lsu.edu) 00002 // 00003 // Distributed under the Boost Software License, Version 1.0. (See accompanying 00004 // job LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 00005 00006 #include <utility> 00007 00008 #include <saga/saga/base.hpp> 00009 #include <saga/saga/util.hpp> 00010 #include <saga/saga/adaptors/task.hpp> 00011 #include <saga/impl/runtime.hpp> 00012 #include <saga/saga/adaptors/bulk_strategy_try_exec.hpp> 00013 00014 #include <saga/impl/engine/task_container.hpp> 00015 00016 namespace saga 00017 { 00018 namespace adaptors 00019 { 00020 bulk_strategy_try_exec::bulk_strategy_try_exec() 00021 { 00022 } 00023 00024 saga::task bulk_strategy_try_exec::get_first_task(saga::task_container& tc) 00025 { 00026 std::vector<saga::task> bulk_task_list = tc.list_tasks(); 00027 std::vector<saga::task>::iterator tmp_first_task = 00028 bulk_task_list.begin(); 00029 00030 return *tmp_first_task; 00031 } 00032 00033 bool bulk_strategy_try_exec::prepare_args_for_adaptor(saga::task_container& tc, 00034 TR1::shared_ptr<saga::impl::v1_0::cpi> adp) 00035 { 00036 std::vector<saga::task> bulk_task_list = tc.list_tasks(); 00037 std::vector<saga::task>::iterator it_ = bulk_task_list.begin(); 00038 00039 while ( it_ != bulk_task_list.end () ) 00040 { 00041 if( (saga::task_base::New == it_->get_state()) 00042 || (saga::task_base::Running == it_->get_state()) ) 00043 // Remark: the second condition is necessary because 00044 // of the fact that tasks treated in a task_container 00045 // return RUNNING as soon as task_container.run() 00046 // is called. This is necessary to avoid race conditions. 00047 { 00048 try 00049 { 00050 impl::runtime::get_impl(*it_)->visit_args (adp.get ()); 00051 } 00052 catch (std::exception e) 00053 { 00054 return false; 00055 } 00056 } 00057 ++it_; 00058 } 00059 return true; 00060 } 00061 00062 void bulk_strategy_try_exec::apply(sorted_tc_type & analyser_res) 00063 { 00064 // this->thread_ = simple_future <int> ( 00065 // TR1::bind (&bulk_strategy_try_exec::bond_apply, 00066 // this, TR1::ref(analyser_res)) ); 00067 // //this->complete_wait(); 00068 this->bond_apply(analyser_res); 00069 } 00070 00071 int bulk_strategy_try_exec::bond_apply(sorted_tc_type & analyser_res) 00072 { 00073 using namespace boost; 00074 using namespace saga::impl; 00075 00076 // std::cout<< "bulk_strategy_try_exec::apply size INPUT : "<< analyser_res.size() << std::endl; 00077 00078 // iterate over each pair: string -- taskcontainer 00079 // sorted_tc_type::iterator it = analyser_res.begin(); 00080 sorted_tc_type::iterator it_end = analyser_res.end(); 00081 for (sorted_tc_type::iterator it = analyser_res.begin(); 00082 it != it_end; ++it) 00083 { 00084 // get the taskcontainer from out the pair. 00085 saga::task_container tmp_tc = it->second; 00086 00087 // just for testing 00088 // std::cout << "Try_exec: task container: " << it->first << std::endl; 00089 // std::cout << "Try_exec: ----------> Number in there: " << 00090 // (unsigned int)(it->second.list_tasks() ).size() 00091 // << std::endl; 00092 00093 saga::impl::v1_0::preference_type prefs; 00094 00095 std::string cpi_name; 00096 std::string op_name; 00097 this->default_process_func_name((it->first), cpi_name, op_name); 00098 std::vector<saga::uuid> used_adaptors; 00099 00100 size_t nb_tasks_todo = (tmp_tc.list_tasks()).size(); 00101 00102 saga::task first_task = get_first_task(tmp_tc); 00103 TR1::shared_ptr <saga::impl::v1_0::cpi> my_bulk_adaptor; 00104 00105 // as long as there is as least 1 task, which couldn't have been 00106 // treated 00107 while (nb_tasks_todo >= 1) 00108 { 00109 if (nb_tasks_todo > 1) 00110 { 00111 // asking the first task, to search an appropriate adaptor for 00112 // an possible bulk execution 00113 my_bulk_adaptor.reset( 00114 runtime::get_impl(first_task)->get_bulk_adaptor( 00115 cpi_name, op_name, prefs) ); 00116 } 00117 00118 bool adaptor_used_before = true; 00119 00120 // check if we found a bulk adaptor 00121 if (my_bulk_adaptor) 00122 { 00123 std::vector<saga::uuid>::iterator res_find = 00124 std::find (used_adaptors.begin(), 00125 used_adaptors.end(), 00126 my_bulk_adaptor->get_adaptor_uuid() ); 00127 00128 // check if the found bulk adaptor is a new one. 00129 if ( res_find == used_adaptors.end() ) 00130 { 00131 adaptor_used_before = false; 00132 00133 // std::cout<<"found bulk adaptor ... "<<std::endl; 00134 00135 my_bulk_adaptor->init_bulk(); 00136 00137 if (prepare_args_for_adaptor(tmp_tc, my_bulk_adaptor) ) 00138 { 00139 std::vector<saga::uuid> done_tasks = my_bulk_adaptor->execute_bulk(); 00140 00141 runtime::get_impl(tmp_tc)->set_state_by_uuid(done_tasks, 00142 saga::task_base::Done); 00143 00144 nb_tasks_todo -= done_tasks.size(); 00145 } 00146 used_adaptors.push_back(my_bulk_adaptor->get_adaptor_uuid() ); 00147 } 00148 } 00149 // if there is no suitable adaptor, we execute the tasks 00150 // one by one, and switch to the next package. 00151 if ( (!my_bulk_adaptor) || (adaptor_used_before) ) 00152 { 00153 // std::cout<<"NO bulk adaptor found ... "<<std::endl; 00154 00155 // execute this task_container one by one! 00156 runtime::get_impl(tmp_tc)->simple_run(); 00157 break; 00158 } 00159 } // end while 00160 } // end for 00161 00162 return 1; // just for the future; 00163 } 00164 } // namespace adaptors 00165 } // namespace saga