Check in some SPECsfs runs for CIFS.
[bluesky.git] / kvstore / workqueue.cc
1 #include "workqueue.h"
2 #include <boost/thread.hpp>
3
4 namespace bicker
5 {
6
7 WorkQueue::WorkQueue()
8     :_thread_count(0), _min_threads(1), _max_threads(50), _running(true)
9 {
10     for (int i = 0; i < _min_threads; ++i)
11     {
12         spawnThread();
13     }
14 }
15
16 WorkQueue::WorkQueue(int thread_count)
17     :_thread_count(0), 
18      _min_threads(1), _max_threads(thread_count), _running(true)
19 {
20     for (int i = 0; i < _min_threads; ++i)
21     {
22         spawnThread();
23     }
24 }
25
26 WorkQueue::~WorkQueue()
27 {
28     _running = false;
29
30     {
31         mutex::scoped_lock lock(_queue_lock);
32         _queue_non_empty.notify_all();
33     }
34
35     _threads.join_all();
36 }
37
38 void WorkQueue::spawnThread()
39 {
40     ++_thread_count;
41     _threads.create_thread(Worker(this));
42 }
43
44 shared_ptr<WorkUnit> WorkQueue::get()
45 {
46     mutex::scoped_lock lock(_queue_lock);
47
48     while (_queue.size() == 0)
49     {
50         _queue_non_empty.wait(lock);
51         if (!_running) throw interrupted_error();
52     }
53
54     shared_ptr<WorkUnit> back = _queue.front();
55     _queue.pop();
56
57     if (_queue.size() > 0 && _thread_count < _max_threads) spawnThread();
58
59     return back;
60 }
61
62 void WorkQueue::put(shared_ptr<WorkUnit> work_unit)
63 {
64     mutex::scoped_lock lock(_queue_lock);
65
66     _queue.push(work_unit);
67
68     _queue_non_empty.notify_one();
69 }
70
71 WorkQueue::Worker::Worker(WorkQueue* queue)
72     :_queue(queue)
73 {
74 }
75
76 void WorkQueue::Worker::operator()()
77 {
78     while (true)
79     {
80         try
81         {
82             shared_ptr<WorkUnit> unit = _queue->get();
83
84             unit->run();
85         }
86         catch (interrupted_error)
87         {
88             return;
89         }
90     }
91 }
92
93 TaskNotification::TaskNotification()
94 :_expected(0), _count(0), _fail_count(0)
95 {
96 }
97
98 void TaskNotification::registerTask()
99 {
100     mutex::scoped_lock lock(_lock);
101     ++_expected;
102 }
103
104 void TaskNotification::completeTask(bool success)
105 {
106     mutex::scoped_lock lock(_lock);
107     if (!success) ++_fail_count;
108     if (++_count == _expected) _cond.notify_all();
109 }
110
111 void TaskNotification::waitForComplete()
112 {
113     mutex::scoped_lock lock(_lock);
114     while (_count < _expected)
115     {
116         _cond.wait(lock);
117     }
118 }
119
120 bool TaskNotification::failCount()
121 {
122     return _fail_count;
123 }
124
125 } // namespace bicker