Меню

Синхронизация потоков в boost



boost::thread — синхронизация запуска потоков в пуле

int pool::add_thread() <
// wait the thread for 1 second
boost::system_time wait_time = boost::get_system_time() + boost::posix_time::milliseconds(1000);
try <
// create thread
boost::shared_ptr temp_thread(new boost::thread(boost::bind(&thread_starter, this)));

// wait for thread’s report or timeout
boost::unique_lock lock(start_flag_mutex);
start_flag.timed_wait(lock, wait_time);

// thread is ready, put it to other ready threads
thread_pool.push_back(temp_thread);
> catch (boost::thread_resource_error) <
return -1;
>
>

void pool::thread_routine() <
// notify the main thread, that we’ve started
start_flag.notify_one();
// lock mutex to wait for conditional variable signal
boost::mutex::scoped_lock lock(thr_wait_mutex);
// wait for a signal to start
thr_wait.wait(lock);

// main task executing
.

Основная проблема синхронизации:

  1. запускается несколько потоков
  2. часто бывает, что основной поток, запустил все потоки из пула, и, не передавая им управления, послал сигнал на выполнение задач. После этого, дочерние потоки зависают в ожидании сигнала, который уже был дан. Попытался решить блокировкой основного потока по сигналу, который дает создаваемый дочерний поток на старте (отражено в приведенном коде).
  3. приведенный код тоже имеет недостаток:
    запущенный дочерний поток иногда успевает проскочить и послать сигнал раньше, чем основной поток начнет ожидать его сигнала. Результат: основной поток, прождав секунду, считает что дочерний поток не готов к выполнению задачи.

Пока в голову приходят три варианта решения:

  • Использовть не сам boost::thread, а созданный на его базе класс, который имел бы метод для рапорта о готовности. Но опять же, остается вероятность, что при выполнении, между двумя командами дочернего потока (установка внутренней переменной готовности и блокировка по переменной состояния), вклинится сигнал основного потока, и будем иметь зависание дочернего потока
  • записывать id в какой-нить внутренний массив, затем ждать сигнала. основной поток проверяет массив на готовность дочерних потоков, затем дает сигнал на выполнение задач. остается та же вероятность вклинивания основного потока, что и в предыдущем пункте.
  • соединить сигнал готовности, посылаемый из дочернего потока с одним из предыдущих вариантов. В таком случае:
    — если сигнал из дочернего потока будет дан раньше, чем его начнет ожидать основной поток, то после истечения таймаута и проверки потока на готовность вручную, состояние потока будет определено, как «готов к выполенинию»
    — если выполнение основного потока вклинится сразу после установки флага готовности, он все равно будет ждать сигнала от дочернего потока.
    НО! есть опасность, что после установки флага готовности, дочерний поток так и не доберется до посылки сигнала (процессорное время будет занято выполнением других потоков). А, основной поток решит, что он уже готов и пошлет сигнал => дочерний поток повиснет на ожидании сигнала, который уже был дан.

Итак, вопрос: как синхронизировать запуск потоков так, чтобы основной поток достоверно знал о запуске и готовности всех дочерних потоков, прежде чем давать им сигнал на выполнение задач.

Источник

Синхронизация потоков в Boost

Я пытаюсь создать приложение, которое создает один основной поток и 10 подчиненных потоков. Я хочу запустить подчиненные потоки один раз после запуска основного потока. Таким образом, для каждого выполнения основного потока каждый подчиненный поток будет выполняться один раз. Я пытался справиться с этим с двумя разными conditional variables , Таким образом, один используется для подчиненных потоков, чтобы они могли ждать, пока основной поток не уведомит их, а другой — conditional variable для основного потока, который сигнализируется после того, как каждый дочерний элемент завершает свою задачу, поэтому основной поток может проверить, все ли подчиненные потоки выполнены или нет. Код выглядит следующим образом:

Проблема в том, что где-то код останавливается и он застревает в тупике.

Также я попытался изменить способ реализации. Итак, я использовал atomic который подсчитывает количество потоков, выполнивших свою задачу, и в основном потоке я проверяю, равно ли число потоков количеству обновленных потоков, но этот метод также где-то застрял и зашел в тупик.
Код можно найти здесь:

Даже я пытался решить проблему с барьерами, но это не решило мою проблему. код выглядит следующим образом:

[ОБНОВЛЕНО] Итак, я просто использовал mutex , conditional variables а также data_ready в структуре следующим образом и теперь код работает. Я думаю, что была ошибка с использованием pointer to mutex и так далее. код выглядит следующим образом:

Решение

@ он даже с барьером, он застрял в тупике. — ммостаджаб 5 минут назад

Поскольку вы ничего не показываете о том, что вы там делаете, позвольте мне дать вам толчок для стартапа, включив большую часть всех полученных вами предложений:

Имейте в виду, я не знаю, чего вы здесь добиваетесь. Добавляя барьер, я имел в виду это: как использовать форсированный барьер

Другие решения

Я попытался изменить ответ @sehe, чтобы он решил именно ту проблему, которую я ищу, и я получил следующий код:

Источник

Синхронизация в потоках

В программе я хочу что бы повар приготовил еду, а официант подал её. При этом не должно быть так, что официант отнес еще не сделанную еду. Но у меня такое получается!
Можете подсказать с синхронизацией?

Синхронизация в потоках
Доброго времени уважаемые гуру озадачился таким вопросом есть 2 класса от потоков, описаны в.

Синхронизация куска кода в потоках (Си)
Здравствуйте, нужна помощь. Мне надо синхронизировать кусок кода в потоках. Надо чтобы потоки после.

Синхронизация работы циклов в разных потоках
Помогите советами. как синхронизировать циклы в разных потоках. Есть основной метод в «главном.

Модуль EVO II синхронизация 50гц, На какой ножке контроллера синхронизация шим двигателя?
Частый дефект для модулей EVO II с коллекторным двигателем — нет управления двигателем при помехах.

Заказываю контрольные, курсовые, дипломные и любые другие студенческие работы здесь.

Литература о потоках
Хотел бы найти хорошую книгу в которой хорошо изложены вопросы паралельного программирования. В.

Читайте также:  Синхронизация контактов iphone с другим телефоном

Исключения в потоках
Здравствуйте! Столкнулся со следующей проблемой: Написал класс, объект которого должен.

Ошибки в потоках
void test() < Thread ^t = gcnew Thread(gcnew ThreadStart(this, &Form1::hide_button)); t->Start();.

Циклы в потоках
Использую многопоточность.Необходимо включить в тело потока 2-3 цикла,не навредит ли это работе.

Источник

Синхронизация потоков в Boost

Я пытаюсь создать приложение, которое создает один основной поток и 10 подчиненных потоков. Я хочу запустить подчиненные потоки один раз после запуска основного потока. Таким образом, для каждого выполнения основного потока каждый подчиненный поток будет выполняться один раз. Я пытался справиться с этим с двумя разными conditional variables , Таким образом, один используется для подчиненных потоков, чтобы они могли ждать, пока основной поток не уведомит их, а другой — conditional variable для основного потока, который сигнализируется после того, как каждый дочерний элемент завершает свою задачу, поэтому основной поток может проверить, все ли подчиненные потоки выполнены или нет. Код выглядит следующим образом:

Проблема в том, что где-то код останавливается и он застревает в тупике.

Также я попытался изменить способ реализации. Итак, я использовал atomic который подсчитывает количество потоков, выполнивших свою задачу, и в основном потоке я проверяю, равно ли число потоков количеству обновившихся потоков, но этот метод также где-то застрял и зашел в тупик. Код можно найти здесь:

Даже я пытался решить проблему с барьерами, но это не решило мою проблему. код выглядит следующим образом:

[ОБНОВЛЕНО] Итак, я просто использовал mutex , conditional variables а также data_ready в структуре следующим образом и теперь код работает. Я думаю, что была ошибка с использованием pointer to mutex и так далее. код выглядит следующим образом:

Источник

boost::thread синхронизация

Надо реализовать: главный поток в цикле выполняет какие-то действия, затем управление передается нескольким другим потокам, как только они выполнят какие-то действия, управление возвращается главному потоку. Система критична по времени, цикл главного потока может выполняться много раз, поэтому потоки создаются перед началом цикла и завершаются в конце работы главного потока. Реализовани примерно так:

В результате ловлю дедлок, в случае, если condition1.wait(mutex1); не успела выполниться, до того как отработали побочные потоки, которые отправили condition1.notify_one();
Может в целом схема не верная? Можно конечно каждый раз создавать побочные потоки с делать join(), но время создания потока соизмеримо с временем выолнения потоками заданной функции, а в случае синхронизации вроде бы, поправьте если бред, должно быть быстрее. Заранее спасибо.

Вы неправильно используете condition variable.

Правильно — «сигналить» condition variable только при залоченном мьютексе (тот который передается в wait).
При этом вы должны где-то хранить информацию о состоянии, изменение которого сигнализируется с помощью condition variable.
Когда wait выполнится, нужно проверять состояние — возможны ложные срабатывания.
Кроме того такой дизайн позволяет ожидать разных событий с использованием общей condition variable.
(Например состояние S — это интовая переменная; CV сигналится каждый раз при изменении S; один тред ждет когда S станет равен нулю, а другой — когда S станет больше 10, причем используют они общую CV).

Вот пример правильной работы с CV (копипаста отсюда).

Конишуа
> Правильно — «сигналить» condition variable только при залоченном мьютексе (тот
> который передается в wait).
Спасибо, это помогло.

Источник

Синхронизация потоков в Boost

Я пытаюсь создать приложение, которое создает один основной поток и 10 подчиненных потоков. Я хочу запустить подчиненные потоки один раз после запуска основного потока. Таким образом, для каждого выполнения основного потока каждый подчиненный поток будет выполняться один раз. Я попытался справиться с этим с двумя разными conditional variables . Таким образом, один используется для подчиненных потоков, чтобы они могли ждать, пока основной поток не уведомит их, а другой conditional variable для основного потока, который сигнализируется после завершения каждой дочерней задачи, поэтому основной поток может проверить, выполнены ли все подчиненные потоки или нет. Код выглядит следующим образом:

Проблема в том, что где-то код останавливается, и он застрял в тупике.

Кроме того, я попытался изменить способ реализации. Итак, я использовал atomic , который подсчитывает количество потоков, которые закончили свою задачу, и в основном потоке я проверяю, равно ли количество потоков количеству потоков, которые обновили себя, но этот метод также застрял где-то и зашел в тупик. Код можно найти здесь:

Даже, я попытался решить эту проблему с барьерами, но это не решило мою проблему. код выглядит следующим образом:

[UPDATED] Итак, я просто использовал mutex , conditional variables и data_ready в структуре следующим образом, и теперь код работает. Я думаю, что была ошибка с использованием pointer to mutex и так далее. код выглядит следующим образом:

2 ответа

Задавали этот вопрос в интервью, пытались его решить . но безуспешно. Я думал использовать CyclicBarrier Есть три нити Т1 печатает 1,4,7. T2 печатает 2,5,8. и T3 печатает 3,6,9. Как синхронизировать эти три для печати последовательности 1,2,3,4,5,6,7,8,9. Я попытался написать &.

У меня есть потоки NUM_THREADS, со следующими кодами в моем потоке: /* Calculate some_value; */ //Critical section to accummulate all thresholds < boost::mutex::scoped_lock lock(write_mutex); T += some_value; num_threads++; if (num_threads == NUM_THREADS)< T = T/NUM_THREADS; READY = true;.

@sehe даже с барьером, он застрял в тупике. — mmostajab 5 минут назад

Поскольку вы ничего не показываете о том, что вы там делаете, позвольте мне дать вам startup boost, включив большой кусок всех предложений, которые вы получили:

Имейте в виду, я не знаю, чего вы пытаетесь достичь здесь. Добавление барьера я имел это в виду: как использовать _31 барьер

Я попытался изменить ответ @sehe, поэтому он решает именно ту проблему, которую я ищу, и я добился этого кода:

Похожие вопросы:

когда я пытаюсь запустить следующий код в режиме отладки и выпуска в VS2005. Каждый раз, когда я вижу разные выходные данные в консоли, и не похоже, что многопоточность достигается в режиме выпуска.

Читайте также:  Что такое синхронизация атс

Я не вижу синхронизированного вывода, когда я комментирую строку wait(1) в thread() . могу ли я заставить их работать одновременно (один за другим) без необходимости использовать ‘wait(1)’? #include.

Я пытаюсь включить библиотеку потоков Boost в свой проект C++. Мой файл CMake выглядит так: cmake_minimum_required(VERSION 3.6) project(LearningC) find_package(Boost REQUIRED).

Задавали этот вопрос в интервью, пытались его решить . но безуспешно. Я думал использовать CyclicBarrier Есть три нити Т1 печатает 1,4,7. T2 печатает 2,5,8. и T3 печатает 3,6,9. Как.

У меня есть потоки NUM_THREADS, со следующими кодами в моем потоке: /* Calculate some_value; */ //Critical section to accummulate all thresholds < boost::mutex::scoped_lock lock(write_mutex); T +=.

Мне нужен threadpool для моего приложения, и я хотел бы как можно больше полагаться на стандартные (C++11 или boost) вещи. Я понимаю, что существует неофициальный(!) boost класс пула потоков.

Я хотел бы выполнить следующий алгоритм — это должно быть сделано в Java for(int i = 0; i < 100; i++)< create 8 threads which perform a task wait for all threads to finish >Желательно, чтобы.

У меня есть код, подобный приведенному ниже. Приведенный ниже код дает SIGSEGV и указывает на list::push_back. Это правильный способ использовать список boost потоков? struct mythread< static void.

Я пытаюсь синхронизировать два потока (работающих на одной карте C++), используя библиотеку Boost. Я должен сказать, что я не эксперт в C++, и я нахожу документацию boost довольно трудной для.

Строго определенная синхронизация потоков или сериализация-это применение определенных механизмов для обеспечения того, чтобы два одновременно выполняющихся потока или процесса не выполняли.

Источник

Синхронизация потоков с boost::condition_variable

Я провожу несколько экспериментов по многопоточности C++ и не знаю, как решить одну проблему. Допустим, у нас есть пул потоков, который обрабатывает запросы пользователей, используя существующий поток, и создает новый поток, когда нет свободных потоков. Я создал потокобезопасный класс command_queue, в котором есть методы push и pop. pop ожидает, пока очередь пуста, и возвращает, только когда команда доступна или истекло время ожидания. Теперь пришло время реализовать пул потоков. Идея состоит в том, чтобы свободные потоки спали в течение некоторого времени и убивали поток, если после этого периода времени ничего не делается. Вот реализация

здесь мы выходим из процедуры потока, если истекло время ожидания. Это нормально, но есть проблема с созданием нового потока. Допустим, у нас уже есть 2 потока, обрабатывающих пользовательские запросы, они работают в данный момент, но нам нужно выполнить некоторую другую операцию асинхронно. Мы называем

который должен начать новую тему, потому что нет свободных тем, доступных. Когда тема доступна, она вызывает timed_wait по условной переменной, поэтому идея состоит в том, чтобы проверить, есть ли потоки, которые ожидают.

а как это проверить? Документация говорит, что если нет ожидающих потоков, notify_one ничего не делает. Если бы я мог проверить, действительно ли он ничего не сделал, это было бы решением

Насколько я вижу, это невозможно проверить.

Спасибо за ваши ответы.

5 ответов

Вам нужно создать другую переменную (возможно, семафор), которая знает, сколько потоков запущено, затем вы можете проверить это и создать новый поток, если необходимо, перед вызовом notify.

Другой, лучший вариант — просто не выводить потоки по истечении времени ожидания. Они должны остаться в живых в ожидании уведомления. Вместо того, чтобы выйти, когда время ожидания уведомления истекло, проверьте переменную, чтобы увидеть, работает ли программа по-прежнему или она «закрывается», если она все еще работает, затем начните ожидание снова.

Источник

Компьютерные технологии (архив 2012г.)

6 Лекция — Библиотека Boost. Многопоточный сервер на boost.

Библиотека Boost.

Boost — собрание библиотек, расширяющих функциональность C++.

Проект является своего рода «испытательным полигоном», и часть библиотек являются кандидатами на включение в следующий стандарт C++. Некое дополнение к STL.

Библиотеки Boost охватывают следующее:

  • Алгоритмы
  • Обход ошибок в компиляторах, не соответствующих стандарту
  • Многопоточное программирование
  • Контейнеры
  • Юнит-тестирование
  • Структуры данных
  • Функциональные объекты
  • Обобщённое программирование
  • Графы
  • Работа с геометрическими данными
  • Ввод-вывод
  • Межъязыковая поддержка
  • Итераторы
  • Математические и числовые алгоритмы
  • Работа с памятью
  • Синтаксический и лексический разбор
  • Метапрограммирование на основе препроцессора
  • «Умные указатели»
  • Обработка строк и текста
  • Метапрограммирование на основе шаблонов
  • и т.д.

Библиотека для C++ (ООП библиотека). Это некая ООП надстройка над PThreads. Позволяет писать в стиле ООП, что упрощает синтаксис и делает программы более безопасными.

Компилятору необходимо указывать библиотеку, например:

c++ -lboost_system -lboost_thread main.cpp

Переписанная программа из прошлых лекций (pthreads=>boost):

Многопоточный сервер на boost.

Простой многопоточный сервер, который возвращает все что прислал клиент (по TCP):

Источник

Потоки, блокировки и условные переменные в C++11 [Часть 1]

В первой части этой статьи основное внимание будет уделено потокам и блокировкам в С++11, условные переменные во всей своей красе будут подробно рассмотрены во второй части…

Потоки

В C++11, работа с потокам осуществляется по средствам класса std::thread (доступного из заголовочного файла ), который может работать с регулярными функциями, лямбдами и функторами. Кроме того, он позволяет вам передавать любое число параметров в функцию потока.

В этом примере, thr — это объект, представляющий поток, в котором будет выполняться функция threadFunction() . Вызов join блокирует вызывающий поток (в нашем случае — поток main) до тех пор, пока thr (а точнее threadFunction() ) не выполнит свою работу. Если функция потока возвращает значение — оно будет проигнорировано. Однако принять функция может любое количество параметров.

Несмотря на то, что передавать можно любое число параметров, все они были переданы по значению Если в функцию необходимо передать параметры по ссылке, они должны быть обернуты в std::ref или std::cref , как в примере:

Читайте также:  Как сделать синхронизацию музыки в itunes

Программа напечатает в консоль 2. Если не использовать std::ref , то результатом работы программы будет 1.

Помимо метода join , следует рассмотреть еще один, похожий метод — detach .
detach позволяет отсоединить поток от объекта, иными словами, сделать его фоновым. К отсоединенным потокам больше нельзя применять join .

Также следует отметить, что если функция потока кидает исключение, то оно не будет поймано try-catch блоком. Т.е. следующий код не будет работать (точнее работать то будет, но не так как было задумано: без перехвата исключений):

Для передачи исключений между потоками, необходимо ловить их в функции потока и хранить их где-то, чтобы, в дальнейшем, получить к ним доступ.

Прежде, чем двигаться дальше, хочу отметить некоторые полезные функции, предоставляемые , в пространстве имен std::this_thread :

  • get_id: возвращает id текущего потока
  • yield: говорит планировщику выполнять другие потоки, может использоваться при активном ожидании
  • sleep_for: блокирует выполнение текущего потока в течение установленного периода
  • sleep_until: блокирует выполнение текущего потока, пока не будет достигнут указанный момент времени

Блокировки

В последнем примере, я должен был синхронизировать доступ к вектору g_exceptions , чтобы быть уверенным, что только один поток одновременно может вставить новый элемент. Для этого я использовал мьютекс и блокировку на мьютекс. Мьютекс — базовый элемент синхронизации и в С++11 представлен в 4 формах в заголовочном файле :

  • mutex: обеспечивает базовые функции lock() и unlock() и не блокируемый метод try_lock()
  • recursive_mutex: может войти «сам в себя»
  • timed_mutex: в отличие от обычного мьютекса, имеет еще два метода: try_lock_for() и try_lock_until()
  • recursive_timed_mutex: это комбинация timed_mutex и recursive_mutex

Приведу пример использования std::mutex с упомянутыми ранее функциями-помощниками get_id() и sleep_for() :

Программа должна выдавать примерно следующее:

Перед обращением к общим данным, мьютекс должен быть заблокирован методом lock , а после окончания работы с общими данными — разблокирован методом unlock .

Следующий пример показывает простой потокобезопасный контейнер (реализованный на базе std::vector ), имеющий методы add() для добавления одного элемента и addrange() для добавления нескольких элементов.
Примечание: и всё же этот контейнер не является полностью потокобезопасным по нескольким причинам, включая использование va_args . Также, метод dump() не должен принадлежать контейнеру, а должен быть автономной функцией. Цель этого примера в том, что показать основные концепции использования мьютексов, а не не сделать полноценный, безошибочный, потокобезопасный контейнер.

При выполнении этой программы произойдет deadlock (взаимоблокировка, т.е. заблокированный поток так и останется ждать). Причиной является то, что контейнер пытается получить мьютекс несколько раз до его освобождения (вызова unlock ), что невозможно. Здесь и выходит на сцену std::recursive_mutex , который позволяет получать тот же мьютекс несколько раз. Максимальное количество получения мьютекса не определено, но если это количество будет достигно, то lock бросит исключение std::system_error. Поэтому, решение проблемы в коде выше (кроме изменения реализации addrange() , чтобы не вызывались lock и unlock ), заключается в замене мьютекса на std::recursive_mutex .

Теперь, результат работы программы будет следующего вида:

Вы, наверное, заметили, что при вызове threadFunction() , генерируются одни и те же числа. Это происходит потому, что функция void srand (unsigned int seed); инициализирует seed только для потока main. В других потоках, генератор псевдо-случайных чисел не инициализируется и получаются каждый раз одни и те же числа.
Явная блокировка и разблокировка могут привести к ошибкам, например, если вы забудете разблокировать поток или, наоборот, будет неправильный порядок блокировок — все это вызовет deadlock. Std предоставляет несколько классов и функций для решения этой проблемы.
Классы «обертки» позволяют непротиворечиво использовать мьютекс в RAII-стиле с автоматической блокировкой и разблокировкой в рамках одного блока. Эти классы:

  • lock_guard: когда объект создан, он пытается получить мьютекс (вызывая lock() ), а когда объект уничтожен, он автоматически освобождает мьютекс (вызывая unlock() )
  • unique_lock: в отличие от lock_guard , также поддерживает отложенную блокировку, временную блокировку, рекурсивную блокировку и использование условных переменных

С учетом этого, мы можем переписать класс контейнер следующим образом:

Можно поспорить насчет того, что метод dump() должен быть константным, ибо не изменяет состояние контейнера. Попробуйте сделать его таковым и получите ошибку при компиляции:

Мьютекс (не зависимо от формы реализации), должен быть получен и освобожден, а это подразумевает использование не константных методов lock() и unlock() . Таким образом, аргумент lock_guard не может быть константой. Решение этой проблемы заключается в том, чтобы сделать мьютекс mutable , тогда спецификатор const будет игнорироваться и это позволит изменять состояние из константных функций.

Конструкторы классов «оберток» могут принимать параметр, определяющий политику блокировки:

  • defer_lock типа defer_lock_t : не получать мьютекс
  • try_to_lock типа try_to_lock_t : попытаться получить мьютекс без блокировки
  • adopt_lock типа adopt_lock_t : предполагается, что у вызывающего потока уже есть мьютекс

Объявлены они следующим образом:

Помимо «оберток» для мьютексов, std также предоставляет несколько методов для блокировки одного или нескольких мьютексов:

  • lock: блокирует мьютекс, используя алгоритм избегания deadlock’ов (используя lock() , try_lock() и unlock() )
  • try_lock: пытается блокировать мьютексы в порядке, в котором они были указаны

Вот типичный пример возникновения взаимоблокировки (deadlock): у нас есть некий контейнер с элементами и функция exchange() , которая меняет местами два элемента разных контейнеров. Для потокобезопасности, функция синхронизирует доступ к этим контейнерам, получая мьютекс, связанный с каждым контейнером.

Предположим, что эта функция вызвана из двух разных потоков, из первого потока: элемент удаляется из 1 контейнера и добавляется во 2, из второго потока, наоборот, элемент удаляется из 2 контейнера и добавляется в 1. Это может вызвать deadlock (если контекст потока переключается от одного потока к другому, сразу после первой блокировки).

Для решения этой проблемы можно использовать std::lock , который гарантирует блокировку безопасным (с точки зрения взаимоблокировки) способом:

Источник

Adblock
detector