О грамотном остановке потоков

Тема в разделе "MS Visual C++", создана пользователем caHko, 23 дек 2008.

Статус темы:
Закрыта.
  1. caHko

    caHko Гость

    Необходимо решить проблему эффективного параллельного выполнения маленьких задач,на примере qsort
    моя идея такая
    [codebox]#include "stdafx.h"
    class CriticalSection
    {
    public:
    CriticalSection()
    {
    InitializeCriticalSection(&_cs);
    }

    ~CriticalSection()
    {
    DeleteCriticalSection(&_cs);
    }

    void Lock()
    {
    EnterCriticalSection(&_cs);
    }

    void Unlock()
    {
    LeaveCriticalSection(&_cs);
    }
    [sub][/sub]private:
    CRITICAL_SECTION _cs;
    };

    class Event
    {
    public:
    Event()
    {
    _event = CreateEvent(NULL, TRUE, FALSE, NULL);
    }

    ~Event()
    {
    CloseHandle(_event);
    }

    void Signal()
    {
    SetEvent(_event);
    }

    void Reset()
    {
    ResetEvent(_event);
    }

    void Wait(int timeout = INFINITE)
    {
    WaitForSingleObject(_event, timeout);
    }

    static int WaitForTwo(const Event& ev1, const Event& ev2, int timeout = INFINITE)
    {
    HANDLE h[2];
    h[0] = ev1._event;
    h[1] = ev2._event;
    int res = WaitForMultipleObjects(2, h, FALSE, timeout);
    switch (res)
    {
    case WAIT_TIMEOUT:
    return 0;
    case WAIT_FAILED:
    return -1;
    default:
    return res - WAIT_OBJECT_0 + 1;
    }
    }

    private:
    HANDLE _event;
    };

    class ThreadPool;

    class Callback
    {
    public:
    virtual void Invoke(ThreadPool* pool) = 0;
    };

    class MessageQueue
    {
    public:
    void Push(Callback* callback)
    {
    _cs.Lock();
    _queue.push(callback);
    _notEmpty.Signal();
    _cs.Unlock();
    }

    Callback* Pop()
    {
    while (true)
    {
    _notEmpty.Wait();
    _cs.Lock();
    if (!_queue.empty())
    {
    Callback* obj = _queue.front();
    _queue.pop();
    if (_queue.empty())
    _notEmpty.Reset();
    _cs.Unlock();
    return obj;
    }
    _cs.Unlock();
    }
    }

    private:
    std::queue<Callback*> _queue;

    Event _notEmpty;
    CriticalSection _cs;
    };


    class ExecutorThread
    {
    public:
    ExecutorThread() : _messageQueue(NULL), _pool(NULL), _thread(NULL)
    {
    }

    void Start(int number, ThreadPool* pool, MessageQueue* mq)
    {
    _number = number;
    _pool = pool;
    _messageQueue = mq;
    _thread = CreateThread(NULL, 0, ThreadProc, this, 0, NULL);
    }

    protected:
    void Run()
    {
    while (true)
    {
    Callback* cb = _messageQueue->Pop();
    if (cb == NULL) return;
    std::cout << "Running in " << _number << " thread...\n";
    cb->Invoke(_pool);
    }
    }

    private:
    static DWORD WINAPI ThreadProc(LPVOID lpParameter)
    {
    ExecutorThread* zis = (ExecutorThread*)lpParameter;
    zis->Run();
    return 0;
    }

    MessageQueue* _messageQueue;
    ThreadPool* _pool;
    HANDLE _thread;
    int _number;
    };

    class ThreadPool
    {
    public:
    ThreadPool(int threads)
    {
    _threads.resize(threads);
    for (unsigned int i = 0; i < _threads.size(); i++)
    _threads.Start(i+1, this, &_messageQueue);
    }

    void Invoke(Callback* callback)
    {
    _messageQueue.Push(callback);
    }

    void Stop()
    {
    }

    public:
    MessageQueue _messageQueue;
    std::vector<ExecutorThread> _threads;
    };


    class PrintCallback : public Callback
    {
    public:
    PrintCallback(int i) : _i(i)
    { }

    virtual void Invoke(ThreadPool* pool)
    {
    Sleep(200);
    std::cout << _i << "\n";
    pool->Invoke(new PrintCallback(_i+1));
    pool->Invoke(new PrintCallback(_i+5));
    }

    private:
    int _i;
    };

    int _tmain(int argc, _TCHAR* argv[])
    {
    ThreadPool pool(20);
    pool.Invoke(new PrintCallback(0));
    Sleep(1000);
    pool.Stop();

    return 0;
    }[/codebox]грамотно остановить много потоков,неюзать слипы ит.д? и как применить qsor,т.е cделать сортировку при помощи потоковt?,(не совсем я понял функции threadproc и т.д) (сори за большой размер сообщения)
     
  2. @LE}{@NDER

    @LE}{@NDER Гость

    Имхо, попытка много-поточной сортировки, идея не самая удачная, разве что в образовательных целях, чтобы научиться синхронизировать потоки.
    Потоки будут конкурировать в попытке доступиться к ресурсу, что значительно снизит эфективность программы, при этом каждый поток будет кушать процессорное время. Даже если ресурс условно поделить между потоками, чтобы каждый отсортировал свою часть, полученные куски придется смерджить(связать) и.. опять таки сортировать....
    Я считаю, тут ничто не превзойдёт по эфективности обычную быструю сортировку.
    С другой стороны, задача многопоточного поиска в ресурсе выглядит намного привлекательнее.

    Что же касается функции ThreadProc - имя функции как константный указатель передается в метод CreateThread() - читай про метод в МСДН. Также классика - книга Рихтера по системному программированию.
    Эта функция (ThreadProc - имя может быть любым), если является членом класс обязана быть статической, дабы не принимала неявный this. Она принимает параметром LPVOID (длинный указатель на void), который затем в теле функции приводится к типу ExecutorThread*. Еще один ньюанс (грабли на которые сам когда-то наступил) - поскольку мы пытаемся юзать функцию, как член класса, 4м параметром CreateThread() мы передаем параметр потоковой функции, и в данном случае передать надо указатель на окно, из которого вызываем функцию - то есть тот же this.
    Код (Text):
    _thread = CreateThread(NULL, 0, ThreadProc, this, 0, NULL);
     
Статус темы:
Закрыта.

Поделиться этой страницей