FYI: C++ / Win32
So I'm writing this threadpool class and I'm just stuck. It is the ONLY thing wrong. Anyway, to sync the job array and count I'm using a Mutex which I am creating like so:
[code]m_hMutex = CreateMutex( NULL, TRUE, NULL );[/code]
m_hMutex is the first thing initialized in the class, then the last line in Initialize() is ReleaseMutex( m_hMutex ); to release ownership.
Here is the problem: Even with argument two being FALSE, every single time a thread wait's for the mutex, it just halts at wait (Using INFINITE, do not want timeouts) unless it is the creating thread. I made sure there were no missing or skipped releases where it is waited for.
Unfortunately I must sleep, but thanks for any given thoughts / help.
Look if you can use a CRITICAL_SECTION. It's easier than a mutex and more light-weighted :)
Did you check if your handle is maybe NULL? Are you not doing CloseHandle before waiting for it?
Maybe try out if [url=http://msdn.microsoft.com/en-us/library/ms686927(VS.85).aspx]this example[/url] is working right for you.
Additionally you could use boost::thread instead of reinventing the wheel, or at least look at its implementation. It's what's going to become part of the standard library.
[QUOTE=ZeekyHBomb;20284344]Look if you can use a CRITICAL_SECTION. It's easier than a mutex and more light-weighted :)
Did you check if your handle is maybe NULL? Are you not doing CloseHandle before waiting for it?
Maybe try out if [url=http://msdn.microsoft.com/en-us/library/ms686927(VS.85).aspx]this example[/url] is working right for you.[/QUOTE]
Handle returns are never NULL, they are of INVALID_HANDLE_VALUE (if i remember it is 0xFFFFFFFF) and I check for it. Also why would I want to close the handle to the mutex, ReleaseMutex is suppose to release the ownership.
Also, I rather not use third-party libraries, but I'll look into what boost is doing.
[b]Edit:[/b] I will also look into Critical Section, I didn't at first because MSDN said it couldn't be used across threads for shared data, unless I read it wrong.
[quote=http://msdn.microsoft.com/en-us/library/ms682411(VS.85).aspx]If the function fails, the return value is NULL. To get extended error information, call GetLastError.[/quote]
Yes, CreateMutex can return NULL. Or the page is wrong.
You close the handle after you're completely done with it. I was just making sure ;)
I am using a CRITICAL_SECTION to protect the access to a std::queue fine. I think the difference is that you cannot use a CRITICAL_SECTION across different processes, but across threads within one process it's fine.
[editline]18:04PM[/editline]
[url=http://msdn.microsoft.com/en-us/library/ms682530(VS.85).aspx]Yep[/url]
I think I found my problem. When the currently executing thread changes to one of my pool threads all my pointers change. I was under the impression that if the pointer's were within the same process that wouldn't happen.
[b]EDIT:[/b] I'll post my code (I didn't want to cause I'm a douche like that)
[code]
/* --------------------------------------------------
CThreadPool - A class that manages a set or pool
of threads. The pool keeps them alive and
gives them work
Header File
-------------------------------------------------- */
#ifndef CTHREADPOOL_H
#define CTHREADPOOL_H
#ifdef _WIN32
#pragma once
#endif
#include "shareddefs.h"
#include "CThread.h"
class CWorker {
public:
virtual void Execute() = 0;
virtual void Release() {};
};
class CWorkerTestA : public CWorker {
public:
std::string* test;
virtual void Execute() {
MessageBeep( MB_OK );
for(int i=0;i<100;i++)
(*test) += 'a';
}
};
class CWorkerTestB : public CWorker {
public:
std::string* test;
virtual void Execute() {
MessageBeep( MB_OK );
for(int i=0;i<100;i++)
(*test) += 'b';
}
};
class CThreadPool {
public:
CThreadPool() : m_pCriticalSection(NULL), m_pThreadPool(NULL), m_pThreadJobs(NULL), m_iThreads(2), m_iAvailableJobs(0) {}
private:
~CThreadPool() {}
public:
void Initialize();
void Release();
int ThreadCount();
int JobCount();
void AddJob( CWorker* pWorker );
bool RequestJob( CWorker** ppWorker );
static DWORD WINAPI CThreadPool::ThreadExecute( void* vpPool );
private:
CRITICAL_SECTION* m_pCriticalSection; // Syncronization of jobs
CThread** m_pThreadPool;
CWorker** m_pThreadJobs;
int m_iThreads, m_iAvailableJobs;
};
#endif // CTHREADPOOL_H
[/code]
[code]
/* --------------------------------------------------
CThreadPool - A class that manages a set or pool
of threads. The pool keeps them alive and
gives them work
Code File
-------------------------------------------------- */
#include "CThreadPool.h"
void CThreadPool::Initialize() {
m_pCriticalSection = new CRITICAL_SECTION;
AssertMsg( m_pCriticalSection != NULL, "Unable to allocate sync object!" );
InitializeCriticalSection( m_pCriticalSection );
EnterCriticalSection( m_pCriticalSection );
m_pThreadJobs = new CWorker*[m_iThreads*2]; //(CWorker**)malloc( sizeof(CWorker*)*m_iThreads*2 );
m_pThreadPool = new CThread*[m_iThreads]; //(CThread**)malloc( sizeof(CThread*)*m_iThreads );
for(int i=0;i<m_iThreads*2;i++)
m_pThreadJobs[i] = NULL;
for(int i=0;i<m_iThreads;i++) {
m_pThreadPool[i] = new CThread( CThreadPool::ThreadExecute, static_cast<void*>(this), 1 );
m_pThreadPool[i]->Run();
}
LeaveCriticalSection( m_pCriticalSection );
}
void CThreadPool::Release() {
EnterCriticalSection( m_pCriticalSection );
for(int i=0;i<m_iThreads;i++) {
CThread* pThread = m_pThreadPool[i];
m_pThreadPool[i] = NULL;
if( pThread != NULL )
pThread->Release();
}
delete[] m_pThreadPool;
for(int i=0;i<m_iAvailableJobs;i++) {
CWorker* pWorker = m_pThreadJobs[i];
m_pThreadJobs[i] = NULL;
if( pWorker != NULL )
pWorker->Release();
}
delete[] m_pThreadJobs;
LeaveCriticalSection( m_pCriticalSection );
DeleteCriticalSection( m_pCriticalSection );
delete m_pCriticalSection;
delete this;
}
void CThreadPool::AddJob( CWorker* pWorker ) {
EnterCriticalSection( m_pCriticalSection );
if( m_iAvailableJobs < m_iThreads*2 ) {
m_pThreadJobs[m_iAvailableJobs] = pWorker;
m_iAvailableJobs++;
}
LeaveCriticalSection( m_pCriticalSection );
}
bool CThreadPool::RequestJob( CWorker** ppWorker ) {
EnterCriticalSection( m_pCriticalSection );
if( m_iAvailableJobs > 0 ) {
*ppWorker = m_pThreadJobs[0]; // Assign worker
m_pThreadJobs = NULL; // Set index 0 to NULL
for(int i=1;i<m_iThreads*2;i++) { // Move jobs down queue
if( m_pThreadJobs[i] )
m_pThreadJobs[i-1] = m_pThreadJobs[i];
m_pThreadJobs[i] = NULL;
}
m_iAvailableJobs--;
}
LeaveCriticalSection( m_pCriticalSection );
return true;
}
DWORD WINAPI CThreadPool::ThreadExecute( void* vpPool ) {
//CThreadPool* pPool = (CThreadPool*)( vpPool );
CThreadPool* pPool = static_cast<CThreadPool*>( vpPool );
CWorker* pWorker = NULL;
while( pPool->RequestJob( &pWorker ) ) {
if( pWorker )
pWorker->Execute();
}
return 0;
}
[/code]
[b][u]Here is how I am executing in main.cpp[/u][/b]
[code]
std::string str;
CThreadPool* test = new CThreadPool();
test->Initialize();
CWorkerTestA* testa = new CWorkerTestA();
testa->test = &str;
CWorkerTestB* testb = new CWorkerTestB();
testb->test = &str;
test->AddJob( testa );
test->AddJob( testb );
// ...
test->Release();
[/code]
[QUOTE=ZeekyHBomb;20287740]I think the difference is that you cannot use a CRITICAL_SECTION across different processes, but across threads within one process it's fine.
[/QUOTE]
Yep thats correct. CS's are more light weight, you should use them if you don't really need a mutex.
They are also cross platform :)
TehBigA, so what problem do you have now?
(Also you may want to replace your fixed list of jobs, with a std::queue so that you can have infinite jobs).
Read up on the worker/consumer thread pool.
Cross platform? The CRITICAL_SECTION is part of the WinAPI.
I've switched from Win32 Threads to [url=http://openmp.org/]OpenMP[/url] to do my parallel processing. Works on the platforms I care about.
[url=http://calvados.di.unipi.it/dokuwiki/doku.php?id=ffnamespace:about]Fastflow[/url] also looks very interesting, I'll take a look at it some when. Anybody here tried it already?
When you remove from a queue they are destructed, don't want that =[
Also I just realised I'm not cleaning up the CWorker when I'm done, but I haven't gotten it to run that far yet.
My current problem is that when the worker thread runs it calls the threadpool's RequestJob, BUT I get this error when EnterCriticalSection( ... ) is called:[code]Unhandled exception at 0x77a62272 in MyGame.exe: 0xC0000005: Access violation writing location 0x00000138.[/code]
[b]EDIT: [/b]
Also every variable in the CRITICAL_SECTION struct is:
Error: expression cannot be evaluated
Anyone have an idea why all the pointers change across the worker threads?
Sorry, you need to Log In to post a reply to this thread.