• Multi-threading and mutexes
    9 replies, posted
FYI: C++ / Win32 So I'm writing this threadpool class and I'm just stuck. It is the ONLY thing wrong. Anyway, to sync the job array and count I'm using a Mutex which I am creating like so: [code]m_hMutex = CreateMutex( NULL, TRUE, NULL );[/code] m_hMutex is the first thing initialized in the class, then the last line in Initialize() is ReleaseMutex( m_hMutex ); to release ownership. Here is the problem: Even with argument two being FALSE, every single time a thread wait's for the mutex, it just halts at wait (Using INFINITE, do not want timeouts) unless it is the creating thread. I made sure there were no missing or skipped releases where it is waited for. Unfortunately I must sleep, but thanks for any given thoughts / help.
Look if you can use a CRITICAL_SECTION. It's easier than a mutex and more light-weighted :) Did you check if your handle is maybe NULL? Are you not doing CloseHandle before waiting for it? Maybe try out if [url=http://msdn.microsoft.com/en-us/library/ms686927(VS.85).aspx]this example[/url] is working right for you.
Additionally you could use boost::thread instead of reinventing the wheel, or at least look at its implementation. It's what's going to become part of the standard library.
[QUOTE=ZeekyHBomb;20284344]Look if you can use a CRITICAL_SECTION. It's easier than a mutex and more light-weighted :) Did you check if your handle is maybe NULL? Are you not doing CloseHandle before waiting for it? Maybe try out if [url=http://msdn.microsoft.com/en-us/library/ms686927(VS.85).aspx]this example[/url] is working right for you.[/QUOTE] Handle returns are never NULL, they are of INVALID_HANDLE_VALUE (if i remember it is 0xFFFFFFFF) and I check for it. Also why would I want to close the handle to the mutex, ReleaseMutex is suppose to release the ownership. Also, I rather not use third-party libraries, but I'll look into what boost is doing. [b]Edit:[/b] I will also look into Critical Section, I didn't at first because MSDN said it couldn't be used across threads for shared data, unless I read it wrong.
[quote=http://msdn.microsoft.com/en-us/library/ms682411(VS.85).aspx]If the function fails, the return value is NULL. To get extended error information, call GetLastError.[/quote] Yes, CreateMutex can return NULL. Or the page is wrong. You close the handle after you're completely done with it. I was just making sure ;) I am using a CRITICAL_SECTION to protect the access to a std::queue fine. I think the difference is that you cannot use a CRITICAL_SECTION across different processes, but across threads within one process it's fine. [editline]18:04PM[/editline] [url=http://msdn.microsoft.com/en-us/library/ms682530(VS.85).aspx]Yep[/url]
I think I found my problem. When the currently executing thread changes to one of my pool threads all my pointers change. I was under the impression that if the pointer's were within the same process that wouldn't happen. [b]EDIT:[/b] I'll post my code (I didn't want to cause I'm a douche like that) [code] /* -------------------------------------------------- CThreadPool - A class that manages a set or pool of threads. The pool keeps them alive and gives them work Header File -------------------------------------------------- */ #ifndef CTHREADPOOL_H #define CTHREADPOOL_H #ifdef _WIN32 #pragma once #endif #include "shareddefs.h" #include "CThread.h" class CWorker { public: virtual void Execute() = 0; virtual void Release() {}; }; class CWorkerTestA : public CWorker { public: std::string* test; virtual void Execute() { MessageBeep( MB_OK ); for(int i=0;i<100;i++) (*test) += 'a'; } }; class CWorkerTestB : public CWorker { public: std::string* test; virtual void Execute() { MessageBeep( MB_OK ); for(int i=0;i<100;i++) (*test) += 'b'; } }; class CThreadPool { public: CThreadPool() : m_pCriticalSection(NULL), m_pThreadPool(NULL), m_pThreadJobs(NULL), m_iThreads(2), m_iAvailableJobs(0) {} private: ~CThreadPool() {} public: void Initialize(); void Release(); int ThreadCount(); int JobCount(); void AddJob( CWorker* pWorker ); bool RequestJob( CWorker** ppWorker ); static DWORD WINAPI CThreadPool::ThreadExecute( void* vpPool ); private: CRITICAL_SECTION* m_pCriticalSection; // Syncronization of jobs CThread** m_pThreadPool; CWorker** m_pThreadJobs; int m_iThreads, m_iAvailableJobs; }; #endif // CTHREADPOOL_H [/code] [code] /* -------------------------------------------------- CThreadPool - A class that manages a set or pool of threads. The pool keeps them alive and gives them work Code File -------------------------------------------------- */ #include "CThreadPool.h" void CThreadPool::Initialize() { m_pCriticalSection = new CRITICAL_SECTION; AssertMsg( m_pCriticalSection != NULL, "Unable to allocate sync object!" ); InitializeCriticalSection( m_pCriticalSection ); EnterCriticalSection( m_pCriticalSection ); m_pThreadJobs = new CWorker*[m_iThreads*2]; //(CWorker**)malloc( sizeof(CWorker*)*m_iThreads*2 ); m_pThreadPool = new CThread*[m_iThreads]; //(CThread**)malloc( sizeof(CThread*)*m_iThreads ); for(int i=0;i<m_iThreads*2;i++) m_pThreadJobs[i] = NULL; for(int i=0;i<m_iThreads;i++) { m_pThreadPool[i] = new CThread( CThreadPool::ThreadExecute, static_cast<void*>(this), 1 ); m_pThreadPool[i]->Run(); } LeaveCriticalSection( m_pCriticalSection ); } void CThreadPool::Release() { EnterCriticalSection( m_pCriticalSection ); for(int i=0;i<m_iThreads;i++) { CThread* pThread = m_pThreadPool[i]; m_pThreadPool[i] = NULL; if( pThread != NULL ) pThread->Release(); } delete[] m_pThreadPool; for(int i=0;i<m_iAvailableJobs;i++) { CWorker* pWorker = m_pThreadJobs[i]; m_pThreadJobs[i] = NULL; if( pWorker != NULL ) pWorker->Release(); } delete[] m_pThreadJobs; LeaveCriticalSection( m_pCriticalSection ); DeleteCriticalSection( m_pCriticalSection ); delete m_pCriticalSection; delete this; } void CThreadPool::AddJob( CWorker* pWorker ) { EnterCriticalSection( m_pCriticalSection ); if( m_iAvailableJobs < m_iThreads*2 ) { m_pThreadJobs[m_iAvailableJobs] = pWorker; m_iAvailableJobs++; } LeaveCriticalSection( m_pCriticalSection ); } bool CThreadPool::RequestJob( CWorker** ppWorker ) { EnterCriticalSection( m_pCriticalSection ); if( m_iAvailableJobs > 0 ) { *ppWorker = m_pThreadJobs[0]; // Assign worker m_pThreadJobs = NULL; // Set index 0 to NULL for(int i=1;i<m_iThreads*2;i++) { // Move jobs down queue if( m_pThreadJobs[i] ) m_pThreadJobs[i-1] = m_pThreadJobs[i]; m_pThreadJobs[i] = NULL; } m_iAvailableJobs--; } LeaveCriticalSection( m_pCriticalSection ); return true; } DWORD WINAPI CThreadPool::ThreadExecute( void* vpPool ) { //CThreadPool* pPool = (CThreadPool*)( vpPool ); CThreadPool* pPool = static_cast<CThreadPool*>( vpPool ); CWorker* pWorker = NULL; while( pPool->RequestJob( &pWorker ) ) { if( pWorker ) pWorker->Execute(); } return 0; } [/code] [b][u]Here is how I am executing in main.cpp[/u][/b] [code] std::string str; CThreadPool* test = new CThreadPool(); test->Initialize(); CWorkerTestA* testa = new CWorkerTestA(); testa->test = &str; CWorkerTestB* testb = new CWorkerTestB(); testb->test = &str; test->AddJob( testa ); test->AddJob( testb ); // ... test->Release(); [/code]
[QUOTE=ZeekyHBomb;20287740]I think the difference is that you cannot use a CRITICAL_SECTION across different processes, but across threads within one process it's fine. [/QUOTE] Yep thats correct. CS's are more light weight, you should use them if you don't really need a mutex. They are also cross platform :) TehBigA, so what problem do you have now? (Also you may want to replace your fixed list of jobs, with a std::queue so that you can have infinite jobs). Read up on the worker/consumer thread pool.
Cross platform? The CRITICAL_SECTION is part of the WinAPI. I've switched from Win32 Threads to [url=http://openmp.org/]OpenMP[/url] to do my parallel processing. Works on the platforms I care about. [url=http://calvados.di.unipi.it/dokuwiki/doku.php?id=ffnamespace:about]Fastflow[/url] also looks very interesting, I'll take a look at it some when. Anybody here tried it already?
When you remove from a queue they are destructed, don't want that =[ Also I just realised I'm not cleaning up the CWorker when I'm done, but I haven't gotten it to run that far yet. My current problem is that when the worker thread runs it calls the threadpool's RequestJob, BUT I get this error when EnterCriticalSection( ... ) is called:[code]Unhandled exception at 0x77a62272 in MyGame.exe: 0xC0000005: Access violation writing location 0x00000138.[/code] [b]EDIT: [/b] Also every variable in the CRITICAL_SECTION struct is: Error: expression cannot be evaluated
Anyone have an idea why all the pointers change across the worker threads?
Sorry, you need to Log In to post a reply to this thread.