Text archives Help
- From: boulos@sci.utah.edu
- To: manta@sci.utah.edu
- Subject: [MANTA] r1515 - in trunk: Core Core/Util Engine/ImageTraversers
- Date: Thu, 19 Jul 2007 19:05:59 -0600 (MDT)
Author: boulos
Date: Thu Jul 19 19:05:59 2007
New Revision: 1515
Added:
trunk/Core/Util/ApproximatePriorityQueue.h
Modified:
trunk/Core/CMakeLists.txt
trunk/Engine/ImageTraversers/DeadlineImageTraverser.cc
trunk/Engine/ImageTraversers/DeadlineImageTraverser.h
Log:
Core/CMakeLists.txt
Core/Util/ApproximatePriorityQueue.h
Adding a simple ApproximatePriorityQueue that allows for a wide range
between higher contention and better priorities for a parallel
implementation.
Engine/ImageTraversers/DeadlineImageTraverser.cc
Engine/ImageTraversers/DeadlineImageTraverser.h
Using the new ApproximatePriorityQueue and avoiding locking.
Currently tiles are not destroyed and are allocated with new in inner
loops. With the Google TCmalloc this seems to be okay, but could
probably be improved with a custom MemoryPool.
Modified: trunk/Core/CMakeLists.txt
==============================================================================
--- trunk/Core/CMakeLists.txt (original)
+++ trunk/Core/CMakeLists.txt Thu Jul 19 19:05:59 2007
@@ -60,6 +60,7 @@
SET (CORE_SOURCES ${CORE_SOURCES}
Util/AlignedAllocator.h
Util/AlignedAllocator.cc
+ Util/ApproximatePriorityQueue.h
Util/Args.h
Util/Args.cc
Util/CPUTime.h
Added: trunk/Core/Util/ApproximatePriorityQueue.h
==============================================================================
--- (empty file)
+++ trunk/Core/Util/ApproximatePriorityQueue.h Thu Jul 19 19:05:59 2007
@@ -0,0 +1,208 @@
+#ifndef MANTA_CORE_UTIL_APPROXIMATE_PRIORITY_QUEUE_H_
+#define MANTA_CORE_UTIL_APPROXIMATE_PRIORITY_QUEUE_H_
+
+#include <Core/Util/PriorityQueue.h>
+
+#include <Core/Math/CheapRNG.h>
+#include <Core/Thread/Mutex.h>
+#include <vector>
+
+namespace Manta {
+
+ // This class represents a priority queue that is approximate by way
+ // of randomization. A push is pushed into a priority queue chosen
+ // at random, and a pop is similarly popped from another random
+ // queue. Each queue is protected by a mutex to ensure thread-safe
+ // usage. For this reason, queries like empty() or size() are
+ // expensive because they require locking each queue for access.
+ // However, probabilistically speaking more queues yields less
+ // contention and higher approximation while less queues yields more
+ // contention and less approximate priority (with the limits being
+ // no contention and no priority metric or 100% contention with
+ // exact priorities)
+
+ template<class DataType, class PriorityType>
+ class ApproximatePriorityQueue {
+ public:
+ typedef PriorityQueue<DataType, PriorityType> QueueType;
+
+ ApproximatePriorityQueue(size_t num_queues, size_t max_threads = 64) :
+ num_queues(num_queues), max_threads(max_threads) {
+ rngs = new CheapRNG[max_threads];
+ queues = new QueueType[num_queues];
+ for (size_t i = 0; i < num_queues; i++) {
+ // TODO(boulos): Number them using ostringstream? Seems pretty
+ // heavyweight include...
+ mutexes.push_back(new SCIRun::Mutex("ApproximatePriorityQueue
mutex"));
+ }
+ }
+
+ ~ApproximatePriorityQueue() {
+ grabAllLocks();
+
+ delete[] rngs;
+ delete[] queues;
+
+ releaseAllLocks();
+ }
+
+ // NOTE(boulos): To get an accurate "everything empty" test, we
+ // must first grab all the locks and then check. We are only
+ // empty if all queues are empty, so we simply AND this result
+ // across all the queues.
+ bool empty() const {
+ grabAllLocks();
+ bool is_empty = true;
+ for (size_t i = 0; i < num_queues; i++) {
+ is_empty &= queues[i].empty();
+ }
+ releaseAllLocks();
+ return is_empty;
+ }
+
+ bool empty(size_t which_queue) {
+ ASSERT(which_queue < num_queues);
+ grabLock(which_queue);
+ bool result = queues[which_queue].empty();
+ releaseLock(which_queue);
+ return result;
+ }
+
+ // As above, an accurate size request requires that the queues
+ // aren't changing, so we must first grab all the locks.
+ size_t size() const {
+ grabAllLocks();
+ size_t sum = 0;
+ for (size_t i = 0; i < num_queues; i++) {
+ sum += queues[i].size();
+ }
+ releaseAllLocks();
+ return sum;
+ }
+
+ size_t size(size_t which_queue) const {
+ ASSERT(which_queue < num_queues);
+ grabLock(which_queue);
+ size_t result = queues[which_queue].size();
+ releaseLock(which_queue);
+ return result;
+ }
+
+ // NOTE(boulos): Reservation doesn't require grabAllLocks since we
+ // can just lock for each queue.
+ void reserve(size_t new_size) {
+ for (size_t i = 0; i < num_queues; i++) {
+ reserve(new_size, i);
+ }
+ }
+
+ void reserve(size_t new_size, size_t which_queue) {
+ ASSERT(which_queue < num_queues);
+ grabLock(which_queue);
+ queues[which_queue].reserve(new_size);
+ releaseLock(which_queue);
+ }
+
+ // To actually have this entire ApproximatePriorityQueue empty, we need
to
+ // grab all locks first
+ void clear() {
+ grabAllLocks();
+ for (size_t i = 0; i < num_queues; i++) {
+ queues[i].clear();
+ }
+ releaseAllLocks();
+ }
+
+ void clear(size_t which_queue) {
+ ASSERT(which_queue < num_queues);
+ grabLock(which_queue);
+ queues[which_queue].clear();
+ releaseLock(which_queue);
+ }
+
+ // NOTE(boulos): Since top and pop might run into empty queues but
+ // the actual set of queues might not be empty (as would happen
+ // near the end of the priority queue usage) we retry at least
+ // num_queues times. Probabilistically speaking, we should have
+ // visited all the queues. Naturally, before actually deciding
+ // there are no elements left an actual size() call should be
+ // made.
+ DataType top(size_t thread_id) const {
+ ASSERT(thread_id < max_threads);
+
+ size_t iterations = 0;
+ do {
+ size_t which_queue = rngs[thread_id].nextInt() % num_queues;
+ grabLock(which_queue);
+ if (!queues[which_queue].empty()) {
+ // Found one
+ DataType result = queues[which_queue].top();
+ releaseLock(which_queue);
+ return result;
+ }
+ releaseLock(which_queue);
+ } while (iterations++ < num_queues);
+
+ return DataType(0);
+ }
+
+ DataType pop(size_t thread_id) const {
+ ASSERT(thread_id < max_threads);
+
+ size_t iterations = 0;
+ do {
+ size_t which_queue = rngs[thread_id].nextInt() % num_queues;
+ grabLock(which_queue);
+ if (!queues[which_queue].empty()) {
+ // Found one
+ DataType result = queues[which_queue].pop();
+ releaseLock(which_queue);
+ return result;
+ }
+ releaseLock(which_queue);
+ } while (iterations++ < num_queues);
+
+ return DataType(0);
+ }
+
+ // NOTE(boulos): We can always push, so there's no need for the
+ // looping construct in the top and pop functions.
+ void push(const DataType& data, const PriorityType& priority, size_t
thread_id) const {
+ ASSERT(thread_id < max_threads);
+
+ size_t which_queue = rngs[thread_id].nextInt() % num_queues;
+ grabLock(which_queue);
+ queues[which_queue].push(data, priority);
+ releaseLock(which_queue);
+ }
+
+ private:
+ inline void grabLock(size_t which_lock) const {
+ mutexes[which_lock]->lock();
+ }
+ inline void releaseLock(size_t which_lock) const {
+ mutexes[which_lock]->unlock();
+ }
+
+ void grabAllLocks() const {
+ for (size_t i = 0; i < num_queues; i++) {
+ grabLock(i);
+ }
+ }
+
+ void releaseAllLocks() const {
+ for (size_t i = 0; i < num_queues; i++) {
+ releaseLock(i);
+ }
+ }
+
+ size_t num_queues;
+ size_t max_threads;
+ CheapRNG* rngs;
+ QueueType* queues;
+ std::vector<SCIRun::Mutex*> mutexes;
+ };
+
+}; // end namespace Manta
+
+#endif // MANTA_CORE_UTIL_APPROXIMATE_PRIORITY_QUEUE_H_
Modified: trunk/Engine/ImageTraversers/DeadlineImageTraverser.cc
==============================================================================
--- trunk/Engine/ImageTraversers/DeadlineImageTraverser.cc (original)
+++ trunk/Engine/ImageTraversers/DeadlineImageTraverser.cc Thu Jul 19
19:05:59 2007
@@ -64,7 +64,7 @@
}
DeadlineImageTraverser::DeadlineImageTraverser(const vector<string>& args)
- : qlock("DeadlineImageTraverser queue lock")
+ : qlock("DeadlineImageTraverser queue lock"), queue(8, 8)
{
finished_coarse = false;
reset_every_frame = false;
@@ -290,47 +290,36 @@
}
}
-void DeadlineImageTraverser::insertIntoQueue(Tile* tiles, int numtiles)
+void DeadlineImageTraverser::insertIntoQueue(Tile* tile, size_t thread_id)
{
- qlock.lock();
- if (queue.size() + numtiles >= total_tiles) {
- cerr << "Can't resize for insertion, but I'll just drop for now." <<
endl;
- qlock.unlock();
- return;
- throw SCIRun::InternalError("Can't currently resize during rendering",
__FILE__, __LINE__);
- }
+ //qlock.lock();
- for(int i=0;i<numtiles;i++){
- Tile* tile = &tiles[i];
- queue.push(tile, tile->priority);
- }
- qlock.unlock();
+ queue.push(tile, tile->priority, thread_id);
+
+ //qlock.unlock();
}
-DeadlineImageTraverser::Tile* DeadlineImageTraverser::popFromQueue(Tile*&
childtiles)
+void DeadlineImageTraverser::insertIntoQueue(vector<Tile*>& tiles, size_t
thread_id)
{
- int numtiles = xrefinementRatio * yrefinementRatio;
- qlock.lock();
- if(queue.empty()) {
- qlock.unlock();
- return 0;
- }
-
- // Create space for the children of this tile
- childtiles = &tiles[next_tile];
- next_tile += numtiles;
- if (next_tile >= total_tiles) {
- cerr << "Can't resize for popFromQueue, so I'll just drop." << endl;
- qlock.unlock();
- return NULL;
- throw SCIRun::InternalError("Can't currently resize during rendering",
__FILE__, __LINE__);
+ //qlock.lock();
+
+ for(size_t i=0; i < tiles.size(); i++) {
+ Tile* tile = tiles[i];
+ queue.push(tile, tile->priority, thread_id);
}
- // Get the next tile from the queue
- Tile* tile = queue.pop();
+ //qlock.unlock();
+}
- qlock.unlock();
- return tile;
+DeadlineImageTraverser::Tile* DeadlineImageTraverser::popFromQueue(size_t
thread_id) {
+ //qlock.lock();
+ //if (queue.empty()) {
+ //qlock.unlock();
+ // return 0;
+ //}
+ Tile* result = queue.pop(thread_id);
+ //qlock.unlock();
+ return result;
}
void DeadlineImageTraverser::renderImage(const RenderContext& context,
Image* image)
@@ -395,7 +384,7 @@
tile->xmag = xcoarsepixelsize;
tile->ymag = ycoarsepixelsize;
computePriority(tile, frag);
- insertIntoQueue(tile, 1);
+ insertIntoQueue(tile, static_cast<size_t>(context.proc));
} else {
context.pixelSampler->renderFragment(context, frag);
}
@@ -409,17 +398,19 @@
if (!finished_coarse)
finished_coarse = true;
+ vector<Tile*> childtiles;
+ childtiles.reserve(xrefinementRatio * yrefinementRatio);
+
while(CPUTime::currentSeconds() < frameEnd){
- Tile* childtiles;
- Tile* tile = popFromQueue(childtiles);
+ Tile* tile = popFromQueue(static_cast<size_t>(context.proc));
if(!tile)
continue;
+ childtiles.clear();
+
float newxmag = tile->xmag/xrefinementRatio;
float newymag = tile->ymag/yrefinementRatio;
- int nchild = 0;
-
if(newxmag < 1 || newymag < 1) {
// Super sample the fragments. We want our fragment to exactly
// place 1./newxmag samples into each pixel, so we need to
@@ -499,7 +490,7 @@
}
}
- Tile* childtile = &childtiles[nchild];
+ Tile* childtile = new Tile;
childtile->xstart = x;
childtile->xend = xend;
childtile->ystart = y;
@@ -507,7 +498,7 @@
childtile->xmag = newxmag;
childtile->ymag = newymag;
computePriority(childtile, frag);
- nchild++;
+ childtiles.push_back(childtile);
image->set(frag);
}
}
@@ -544,7 +535,7 @@
// NOTE(boulos): We know that we don't just stop anymore due
// to super sampling
singleSampler->renderFragment(context, frag);
- Tile* childtile = &childtiles[nchild];
+ Tile* childtile = new Tile;
childtile->xstart = x;
childtile->xend = xend;
childtile->ystart = y;
@@ -552,15 +543,16 @@
childtile->xmag = newxmag;
childtile->ymag = newymag;
computePriority(childtile, frag);
- nchild++;
+ childtiles.push_back(childtile);
image->set(frag);
}
}
}
- if(nchild)
- insertIntoQueue(childtiles, nchild);
+ // Check size before inserting to avoid lock contention
+ if(childtiles.size())
+ insertIntoQueue(childtiles, static_cast<size_t>(context.proc));
}
// This can potentially happen before the other procesors are finished
Modified: trunk/Engine/ImageTraversers/DeadlineImageTraverser.h
==============================================================================
--- trunk/Engine/ImageTraversers/DeadlineImageTraverser.h (original)
+++ trunk/Engine/ImageTraversers/DeadlineImageTraverser.h Thu Jul 19
19:05:59 2007
@@ -34,6 +34,7 @@
#include <Interface/ImageTraverser.h>
#include <Interface/Fragment.h>
#include <Core/Thread/Mutex.h>
+#include <Core/Util/ApproximatePriorityQueue.h>
#include <Core/Util/PriorityQueue.h>
#include <sgi_stl_warnings_off.h>
@@ -94,7 +95,8 @@
SCIRun::Mutex qlock;
Tile* tiles;
//Tile** queue;
- PriorityQueue<Tile*, float> queue;
+ //PriorityQueue<Tile*, float> queue;
+ ApproximatePriorityQueue<Tile*, float> queue;
int next_tile;
//int qsize;
int total_tiles;
@@ -109,8 +111,9 @@
};
PriorityScheme priority;
- void insertIntoQueue(Tile* oldtile, int numtiles);
- Tile* popFromQueue(Tile*& childtiles);
+ void insertIntoQueue(Tile* tile, size_t thread_id);
+ void insertIntoQueue(vector<Tile*>& tiles, size_t thread_id);
+ Tile* popFromQueue(size_t thread_id);
void computePriority(Tile* tile, const Fragment& frag) const;
};
- [MANTA] r1515 - in trunk: Core Core/Util Engine/ImageTraversers, boulos, 07/19/2007
Archive powered by MHonArc 2.6.16.