Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "dataqueue.h"
00023 #include "scoped_lock.h"
00024 #include "data.h"
00025 #include "time.h"
00026 #include <iostream>
00027
00028 using namespace std;
00029
00030 namespace Barry {
00031
00032
00033
00034
00035 DataQueue::DataQueue()
00036 {
00037 pthread_mutex_init(&m_waitMutex, NULL);
00038 pthread_cond_init(&m_waitCond, NULL);
00039
00040 pthread_mutex_init(&m_accessMutex, NULL);
00041 }
00042
00043 DataQueue::~DataQueue()
00044 {
00045 scoped_lock lock(m_accessMutex);
00046
00047 while( m_queue.size() ) {
00048 delete raw_pop();
00049 }
00050 }
00051
00052
00053 void DataQueue::raw_push(Data *data)
00054 {
00055 try {
00056 m_queue.push_back(data);
00057 }
00058 catch(...) {
00059 delete data;
00060 throw;
00061 }
00062 }
00063
00064
00065 Data* DataQueue::raw_pop()
00066 {
00067 if( m_queue.size() == 0 )
00068 return 0;
00069
00070 Data *ret = m_queue.front();
00071 m_queue.pop_front();
00072 return ret;
00073 }
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084 void DataQueue::push(Data *data)
00085 {
00086 {
00087 scoped_lock lock(m_accessMutex);
00088 raw_push(data);
00089 }
00090
00091
00092 scoped_lock wait(m_waitMutex);
00093 pthread_cond_broadcast(&m_waitCond);
00094 }
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104 Data* DataQueue::pop()
00105 {
00106 scoped_lock lock(m_accessMutex);
00107 return raw_pop();
00108 }
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120 Data* DataQueue::wait_pop(int timeout)
00121 {
00122
00123 {
00124 scoped_lock access(m_accessMutex);
00125 if( m_queue.size() ) {
00126 return raw_pop();
00127 }
00128 }
00129
00130
00131
00132 if( timeout == -1 ) {
00133
00134 int size = 0;
00135 do {
00136 {
00137 scoped_lock wait(m_waitMutex);
00138 pthread_cond_wait(&m_waitCond, &m_waitMutex);
00139 }
00140
00141
00142 scoped_lock access(m_accessMutex);
00143 size = m_queue.size();
00144 if( size != 0 ) {
00145
00146 return raw_pop();
00147 }
00148
00149 } while( size == 0 );
00150 }
00151 else {
00152
00153 struct timespec to;
00154 scoped_lock wait(m_waitMutex);
00155 pthread_cond_timedwait(&m_waitCond, &m_waitMutex,
00156 ThreadTimeout(timeout, &to));
00157 }
00158
00159 scoped_lock access(m_accessMutex);
00160 return raw_pop();
00161 }
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177 void DataQueue::append_from(DataQueue &other)
00178 {
00179 scoped_lock us(m_accessMutex);
00180 scoped_lock them(other.m_accessMutex);
00181
00182 while( other.m_queue.size() ) {
00183 raw_push( other.m_queue.front() );
00184
00185
00186
00187 other.raw_pop();
00188 }
00189 }
00190
00191
00192
00193
00194
00195
00196 bool DataQueue::empty() const
00197 {
00198 scoped_lock access(m_accessMutex);
00199 return m_queue.size() == 0;
00200 }
00201
00202
00203
00204
00205
00206
00207 size_t DataQueue::size() const
00208 {
00209 scoped_lock access(m_accessMutex);
00210 return m_queue.size();
00211 }
00212
00213 void DataQueue::DumpAll(std::ostream &os) const
00214 {
00215
00216
00217
00218 scoped_lock access(m_accessMutex);
00219 queue_type::const_iterator b = m_queue.begin(), e = m_queue.end();
00220 for( ; b != e; ++b ) {
00221 os << **b << endl;
00222 }
00223 }
00224
00225 }
00226
00227
00228 #ifdef __DQ_TEST_MODE__
00229
00230 #include <iostream>
00231
00232 using namespace std;
00233 using namespace Barry;
00234
00235 void *WriteThread(void *userdata)
00236 {
00237 DataQueue *dq = (DataQueue*) userdata;
00238
00239 dq->push( new Data );
00240 dq->push( new Data );
00241 sleep(5);
00242 dq->push( new Data );
00243
00244 return 0;
00245 }
00246
00247 void *ReadThread(void *userdata)
00248 {
00249 DataQueue *dq = (DataQueue*) userdata;
00250
00251 sleep(1);
00252 if( Data *d = dq->pop() ) {
00253 cout << "Received via pop: " << d << endl;
00254 delete d;
00255 }
00256 else {
00257 cout << "No data in the queue yet." << endl;
00258 }
00259
00260 while( Data *d = dq->wait_pop(5010) ) {
00261 cout << "Received: " << d << endl;
00262 delete d;
00263 }
00264 return 0;
00265 }
00266
00267 int main()
00268 {
00269 DataQueue from;
00270 from.push( new Data );
00271
00272 DataQueue dq;
00273 dq.append_from(from);
00274
00275 pthread_t t1, t2;
00276 pthread_create(&t1, NULL, &ReadThread, &dq);
00277 pthread_create(&t2, NULL, &WriteThread, &dq);
00278
00279 pthread_join(t2, NULL);
00280 pthread_join(t1, NULL);
00281 }
00282
00283 #endif
00284