/* TAGS: ext */ /* VERIFY_OPTS: -o nofail:malloc */ /* CC_OPTS: */ // V: safety // V: local V_OPT: --nontermination local // V: global V_OPT: --nontermination global #include #include #include #include #include #include #include #include #include template < typename T > class Queue { public: T pop() { std::unique_lock lk( _m ); _popSig.wait( lk, [this]{ return !_q.empty() || _shutdown || _kill; } ); // same as: // while ( _q.empty() && !_shutdown && !_kill ) // _popSig.wait(); if ( _kill ) throw std::runtime_error( "Queue: kill" ); if ( _shutdown && _q.empty() ) throw std::runtime_error( "Queue: shutdown empty queue" ); auto r = std::move( _q.front() ); _q.pop_front(); return r; } std::optional< T > try_pop() { std::lock_guard lk( _m ); if ( _kill ) throw std::runtime_error( "Queue: kill" ); if ( _q.empty() ) return std::nullopt; auto r = std::move( _q.front() ); _q.pop_front(); return { r }; } void push( const T &x ) { { std::lock_guard lk( _m ); _q.push_back( x ); } _popSig.notify_one(); } template< typename C > void push( const C& c ) { { std::lock_guard lk( _m ); _q.insert( _q.end(), c.begin(), c.end() ); } _popSig.notify_all(); } void push( std::initializer_list< T > il ) { push<>( il ); } bool empty() const { // The mutex can be locked even in a const method, because it is // mutable (see below). std::lock_guard lk( _m ); return _q.empty(); } void clear() { std::lock_guard lk( _m ); _q.clear(); } void shutdown() { std::lock_guard lk( _m ); _shutdown = true; _popSig.notify_all(); } void cancel_shutdown() { std::lock_guard lk( _m ); _shutdown = false; } bool shutting_down() { std::lock_guard lk( _m ); return _shutdown; } void kill() { std::lock_guard lk( _m ); _kill = true; _popSig.notify_all(); } private: std::deque< T > _q; // Note: This is one of the (rare) cases where using mutable is OK. // Modifying (i.e. locking) the mutex does not change the externally // visible state of the object. mutable std::mutex _m; std::condition_variable _popSig; bool _shutdown = false, _kill = false; }; constexpr int CONSUMERS = 2; constexpr int COUNT = 1 * CONSUMERS; void consume( Queue< int >& q, std::array< std::atomic< int >, COUNT > &seen ) { int last = -1; try { for( ;; ) { int x = q.pop(); ++seen[ x ]; assert( last < x ); last = x; } } catch ( std::runtime_error & ) { /* queue shutdown */ } } void produce( int initial, int count, Queue< int >& q ) { for ( int i = 0; i != count; i++ ) { q.push( initial + i ); } } int main() { Queue< int > q; std::array< std::atomic< int >, COUNT > seen{}; std::vector< std::thread > consumers; for ( int i = 0; i != CONSUMERS; i++ ) consumers.emplace_back( consume, std::ref( q ), std::ref( seen ) ); produce( 0, COUNT, q ); q.shutdown(); for ( auto& t: consumers ) t.join(); for ( auto &v : seen ) assert( v == 1 ); }