SAGA Adaptor CPI v.1.0
|
00001 #if defined(__WAVE__) && defined(SAGA_CREATE_PREPROCESSED_FILES) 00002 #pragma wave option(preserve: 2, line: 1, output: "preprocessed/stream.cpp") 00003 #endif 00004 // Copyright (c) 2005-2007 Andre Merzky (andre@merzky.net) 00005 // Copyright (c) 2005-2009 Hartmut Kaiser 00006 // 00007 // Use, modification and distribution is subject to the Boost Software 00008 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at 00009 // http://www.boost.org/LICENSE_1_0.txt) 00010 00011 #if defined(__WAVE__) && defined(SAGA_CREATE_PREPROCESSED_FILES) 00012 #pragma wave option(preserve: 0, output: null) 00013 #endif 00014 00015 // this is needed in every file including detail/attribute_impl.hpp and not 00016 // belonging to the engine 00017 #define SAGA_NO_IMPORT_ATTRIBUTE 00018 00019 // this is needed in every file including detail/monitorable_impl.hpp and not 00020 // belonging to the engine 00021 #define SAGA_NO_IMPORT_MONITORABLE 00022 00023 #include <boost/assign/list_inserter.hpp> 00024 #include <boost/assign/std/vector.hpp> 00025 00026 // include the package we implement 00027 #include <saga/saga/context.hpp> 00028 #include <saga/saga/stream.hpp> 00029 #include <saga/impl/stream.hpp> 00030 #include <saga/saga/detail/call.hpp> 00031 00032 #include <saga/saga/detail/attribute_impl.hpp> 00033 #include <saga/saga/detail/monitorable_impl.hpp> 00034 00035 #ifdef SAGA_DEBUG 00036 #include <saga/saga/packages/stream/preprocessed/stream.cpp> 00037 #else 00038 00039 #if defined(__WAVE__) && defined(SAGA_CREATE_PREPROCESSED_FILES) 00040 #pragma wave option(preserve: 2, line: 1, output: "preprocessed/stream.cpp") 00041 #endif 00042 00043 namespace saga 00044 { 00045 00046 namespace stream 00047 { 00048 00049 namespace metrics 00050 { 00052 saga::metrics::init_data const stream_metric_data[] = 00053 { 00054 { 00055 saga::stream::metrics::stream_state, 00056 "Metric fires if the state of the stream changes," 00057 " and has the value of the new state.", 00058 saga::attributes::metric_mode_readonly, 00059 "1", 00060 saga::attributes::metric_type_enum, 00061 "1" // New 00062 }, 00063 { 00064 saga::stream::metrics::stream_read, 00065 "Metric fires if a stream gets readable.", 00066 saga::attributes::metric_mode_readonly, 00067 "1", 00068 saga::attributes::metric_type_trigger, 00069 "0" 00070 }, 00071 { 00072 saga::stream::metrics::stream_write, 00073 "Metric fires if a stream gets writable.", 00074 saga::attributes::metric_mode_readonly, 00075 "1", 00076 saga::attributes::metric_type_trigger, 00077 "0" 00078 }, 00079 { 00080 saga::stream::metrics::stream_exception, 00081 "Metric fires if a stream has an error condition.", 00082 saga::attributes::metric_mode_readonly, 00083 "1", 00084 saga::attributes::metric_type_trigger, 00085 "0" 00086 }, 00087 { 00088 saga::stream::metrics::stream_dropped, 00089 "Metric fires if a stream gets dropped by the remote party.", 00090 saga::attributes::metric_mode_readonly, 00091 "1", 00092 saga::attributes::metric_type_trigger, 00093 "0" 00094 } 00095 }; 00097 } 00098 00099 00100 00101 stream::stream (session const& s, saga::url url) 00102 : saga::object (new saga::impl::stream (s, url)) 00103 { 00104 init_attributes(); 00105 init_metrics(); 00106 this->saga::object::get_impl()->init(); 00107 } 00108 00109 stream::stream (saga::url url) 00110 : saga::object (new saga::impl::stream (detail::get_the_session (), url)) 00111 { 00112 init_attributes(); 00113 init_metrics(); 00114 this->saga::object::get_impl()->init(); 00115 } 00116 00117 stream::stream (saga::impl::stream *impl) 00118 : saga::object (impl) 00119 { 00120 init_attributes(); 00121 init_metrics(); 00122 this->saga::object::get_impl()->init(); 00123 } 00124 00125 stream::stream (int) 00126 { 00127 } 00128 00129 void stream::init_attributes() 00130 { 00131 using namespace boost::assign; 00132 00133 // initialize attributes 00134 std::vector<std::string> valid_keys; 00135 valid_keys += 00136 attributes::stream_bufsize, 00137 attributes::stream_timeout, 00138 attributes::stream_blocking, 00139 attributes::stream_compression, 00140 attributes::stream_nodelay, 00141 attributes::stream_reliable 00142 ; 00143 00144 // initialize list of valid keys 00145 this->init_keynames(valid_keys); 00146 00147 strmap_type attributes_scalar_rw; 00148 insert(attributes_scalar_rw) 00149 (attributes::stream_bufsize, "") 00150 (attributes::stream_timeout, "") 00151 (attributes::stream_blocking, "") 00152 (attributes::stream_compression, "") 00153 (attributes::stream_nodelay, "") 00154 (attributes::stream_reliable, "") 00155 ; 00156 00157 // initialize attribute implementation 00158 this->attribute_base_type::init (strmap_type(), attributes_scalar_rw); 00159 } 00160 00161 void stream::init_metrics() 00162 { 00163 // initialize metrics 00164 std::vector<saga::metric> metrics; 00165 for (unsigned int i = 0; 00166 i < sizeof(metrics::stream_metric_data)/sizeof(saga::metrics::init_data); 00167 ++i) 00168 { 00169 saga::metrics::init_data const* p = &saga::stream::metrics::stream_metric_data[i]; 00170 saga::metric m(*this, p->name, p->description, p->mode, p->unit, 00171 p->type, p->value); 00172 metrics.push_back(m); 00173 } 00174 this->monitorable_base_type::init (metrics); 00175 } 00176 00177 stream::stream (void) 00178 : saga::object (new saga::impl::stream (detail::get_the_session(), saga::url())) 00179 { 00180 init_attributes(); 00181 init_metrics(); 00182 } 00183 00184 stream::~stream (void) 00185 { 00186 } 00187 00189 stream::stream (saga::object const& o) 00190 : saga::object (o) 00191 { 00192 if (this->get_type() != saga::object::Stream) 00193 { 00194 SAGA_THROW("Bad type conversion.", saga::BadParameter); 00195 } 00196 } 00197 00198 stream &stream::operator= (saga::object const& o) 00199 { 00200 return saga::object::operator=(o), *this; 00201 } 00202 00203 saga::impl::stream* stream::get_impl() const 00204 { 00205 typedef saga::object base_type; 00206 return static_cast<saga::impl::stream*>(this->base_type::get_impl()); 00207 } 00208 00209 TR1::shared_ptr <saga::impl::stream> stream::get_impl_sp(void) const 00210 { 00211 // FIXME: this needs documentation 00212 typedef saga::object base_type; 00213 return TR1::static_pointer_cast <saga::impl::stream>( 00214 this->base_type::get_impl_sp()); 00215 } 00216 00217 // factory 00218 SAGA_CALL_CREATE_IMP_2(stream, impl::stream, session const&, saga::url) 00219 00220 // stream inspection methods 00221 SAGA_CALL_CONST_IMP_0 (stream, get_url) 00222 SAGA_CALL_CONST_IMP_0 (stream, get_context) 00223 00224 // stream methods 00225 SAGA_CALL_IMP_1 (stream, connect, double) 00226 SAGA_CALL_IMP_2 (stream, wait, saga::stream::activity, double) 00227 SAGA_CALL_IMP_1 (stream, close, double) 00228 SAGA_CALL_IMP_2 (stream, read, saga::mutable_buffer, saga::ssize_t) 00229 SAGA_CALL_IMP_2 (stream, write, saga::const_buffer, saga::ssize_t) 00230 00231 } // namespace saga 00232 00233 namespace detail 00234 { 00236 // implement the attribute functions (we need to explicitly specialize 00237 // the template because the functions are not implemented inline) 00238 template struct SAGA_STREAM_PACKAGE_EXPORT_REPEAT attribute<stream::stream>; 00239 00240 template struct SAGA_STREAM_PACKAGE_EXPORT attribute_priv<stream::stream, task_base::Async>; 00241 template struct SAGA_STREAM_PACKAGE_EXPORT attribute_priv<stream::stream, task_base::Task>; 00242 template struct SAGA_STREAM_PACKAGE_EXPORT attribute_priv<stream::stream, task_base::Sync>; 00243 00244 template struct SAGA_STREAM_PACKAGE_EXPORT attribute_sync<stream::stream>; 00245 00247 // implement the monitorable functions (we need to explicitly specialize 00248 // the template because the functions are not implemented inline) 00249 template struct SAGA_STREAM_PACKAGE_EXPORT monitorable<stream::stream>; 00250 } 00251 00252 namespace adaptors 00253 { 00254 saga::stream::state stream_state_value_to_enum(std::string const& val) 00255 { 00256 if (val == saga::stream::attributes::stream_state_new) 00257 return saga::stream::New; 00258 00259 if (val == saga::stream::attributes::stream_state_open) 00260 return saga::stream::Open; 00261 00262 if (val == saga::stream::attributes::stream_state_closed) 00263 return saga::stream::Closed; 00264 00265 if (val == saga::stream::attributes::stream_state_dropped) 00266 return saga::stream::Dropped; 00267 00268 if (val == saga::stream::attributes::stream_state_error) 00269 return saga::stream::Error; 00270 00271 return saga::stream::Unknown; 00272 } 00273 00274 std::string stream_state_enum_to_value(int s) 00275 { 00276 switch(s) { 00277 case saga::stream::New: 00278 return saga::stream::attributes::stream_state_new; 00279 00280 case saga::stream::Open: 00281 return saga::stream::attributes::stream_state_open; 00282 00283 case saga::stream::Closed: 00284 return saga::stream::attributes::stream_state_closed; 00285 00286 case saga::stream::Dropped: 00287 return saga::stream::attributes::stream_state_dropped; 00288 00289 case saga::stream::Error: 00290 return saga::stream::attributes::stream_state_error; 00291 00292 default: 00293 case saga::stream::Unknown: 00294 break; 00295 } 00296 return saga::stream::attributes::stream_state_unknown; 00297 } 00298 } 00299 00301 } 00302 #endif // SAGA_DEBUG 00303