10/20/2006 3:47:00 AM

Случаются ситуации, когда данные должны быть обработаны эксклюзивно. Это значит, что среди немереного количества ваших параллельных задач вдруг встречается такая, которая может быть выполнена только при условии, что других задач в данный момент не обрабатывается.
Иными словами, нам нужно подождать, пока уже запущенные параллельные задачи закончатся, после чего запустить нашу эксклюзивную задачу, и только после ее завершения продолжить выполнение других параллельных задач.

Так же нередко встречаются ситуации, при которых нужно вообще перестать обрабатывать поступающие задачи. Например, пользователь нажал кнопку "отмена" и нам нужно перестать проверять ссылки на "живучесть".

Для разрешения таких ситуаций у класса Arbiter существует метод Interleave(..).
Пример его использования:

1. Arbiter.Activate(queue,
2.     Arbiter.Interleave(
3.         new TeardownReceiverGroup(
4.             Arbiter.Receive<EmptyValue>(false, stopPort,
5.                 delegate(EmptyValue signal) { disp.Dispose(); }
6.             )
7.         ),
8.         new ExclusiveReceiverGroup(
9.             Arbiter.Receive<int>(true, exclusivePort, StartExclusiveTask)
10.         ),
11.         new ConcurrentReceiverGroup(
12.             Arbiter.Receive<int>(true, normalPort, StartParallelTask)
13.         )
14.     )
15. );

Метод Interleave(..) принимает на вход три параметра: три группы получателей-обработчиков.

Первая группа: TeardownReceiverGroup. Обработчики в этой группе не могут быть зарегистрированы "на постоянной основе", именно поэтому при регистрации получателя (строка 4) я указываю false. Это потому, что при "срабатывании" этой группы, то есть, как только один из получателей, зарегистрированных в ней, принимает задачу, прием других задач и запуск других обработчиков прекращается.
Имеются в виду только те обработчики, которые были зарегистрированы в том же методе Interleave, а не всех во всем приложении, естественно. Все другие обработчики для этой и других очередей продолжают работать.
Таким образом, группу TeardownReceiverGroup следует понимать как отмему регистрации всех получателей, сделанную в текущем методе Interleave. Что, в общем-то, и следует из ее названия.

Вторая группа: ExclusiveReceiverPort. Вот именно сюда и следует помещать обработчики задач, которые будут выполняться эксклюзивно. В моем примере при появлении значения в порту exclusivePort CCR подождет, пока завершатся уже запущенные задачи из группы ConcurrentReceiverPort (см. ниже), после чего вызовет метод StartExclusiveTask, передав ему полученное значение. И только после завершения StartExclusiveTask будет продолжен запуск параллельных задач из ConcurrentReceiverPort.
Обработчики в этой группе могут быть зарегистрированы "на постоянной основе".

Третья группа: ConcurrentReceiverPort. Это основная группа, где регистрируются обработчики, которые должны запускаться параллельно (что тоже следует из названия).
Обработчики в этой группе могут тоже быть зарегистрированы "на постоянной основе".

Итак, вернемся к задаче завершения приложения.
В моем простом случае я знаю, что пул не используется для других целей, поэтому мне достаточно просто "прибить" пул потоков при получении сигнала о завершении.
Поэтому в группе TeardownReceiverGroup я регистрирую обработчик "пустого значения", который, при появлении этого значения в порту stopPort просто вызывает метод Dispose у пула потоков.
В других случаях, если пул используется другими очередями, другими обработчиками и т.д. и алгоритм завершения будет другой :)
Видимо, для удобства в подобных ситуациях в CCR есть класс Shutdown, который представляет собой по сути набор портов (Success, Excepion) и экземпляр которого можно передавать куда-нибудь..

Tags:

10/19/2006 3:47:00 AM

Вот меня спросили: что, если нужно просто запустить задачу асинхронно? Проще использовать BackgroundWorker или воспользоваться CCR?
На мой взгляд, если где-то в недрах приложения вам нужно запустить один асинхронный таск - то сделать это проще через какой-нибудь BeginInvoke. Или запустить BackgroundWorker. Не тащить же ради этого CCR в проект (другое дело, что если вы задумаете все же серьезно заняться производительностью и распараллелить все, что только можно - тут вам CCR очень сильно поможет).
Но если вы уже используете CCR в приложении, то гораздо проще не городить зоопарк, а продолжать использовать его. Вот пример такого простого запуска, который гораздо проще, кстати, чем BackgroundWorker:

 

Arbiter.Activate(queue,
    Arbiter.FromHandler(delegate() { /* do something */ })
);

 

Я попробую показать еще пару примеров использования:

Arbiter.Activate(queue,
    Arbiter.JoinedReceive<double, double>(true, taxesPort, profitPort, CalculateMoney)
);

Метод JoinedReceive - своего рода "синхронизатор". Он дождется, пока будет вычислена и помещена в порт "taxesPort" величина налога, пока будет вычислена и помещена в порт "profitPort" величина прибыли, и только получив обе этих величины вызовет (в отдельном потоке) метод CalculateMoney(double, double).


Port<Item> itemsPort = new Port<Item>();
Arbiter.Activate(queue,
    Arbiter.MultipleItemReceive<double>(
        true,
        itemsPort,
        5,
        ProcessFiveItems
    )
);

Вероятно, метод MultipleItemReceive<..> - один из наиболее полезных. Он позволяет делать пакетную обработку данных. Например в данном случае метод ProcessFiveItems(ICollection<double>) будет вызван тогда, когда "накопится" пять значений. Они, все пять, будут переданы методу в качестве коллекции. Когда "накопится" еще пять - метод будет вызван снова (или не будет, зависит от первого параметра).


Ну, и наконец еще один пример + объяснение того, почему мои примеры из предыдущих постингов могут отработать полностью, а могут - нет.

1. using (Dispatcher disp = new Dispatcher(10, "Main Pool"))
2. {
3.     DispatcherQueue queue = new DispatcherQueue("Main Queue", disp);
4.     
5.     ...
6.     for (int i=0; i<1000; i++)
7.     {
8.         ...
9.     }
10.  
11. }

Все дело в строке 1.
Предположим, создав Dispatcher с максимальным количеством потоков равным 10, мы запускаем обработку тысячи операций (цикл 6-9).
Естественно, 10 будут запущены сразу, остальные поставлены в очереди. Пока идет цикл, постановка в очереди, какие-то из первых операций, возможно, будут завершены, начнутся новые. Но все - явно не успеют начаться.
Итак, наш цикл закончился. Программа идет дальше.
Естественно, при выходе за пределы using'а (11 и далее) у dispatcher'а будет вызван Dispose(). При этом вот что получается: задачи, которые еще только находятся в очереди, но не запущены, так и не будут запущены. А, собственно, где, если нет пула потоков, правила для которого содержит Арбитр?

То есть, таким образом, наше приложение может завершиться и не дождавшись даже старта всех заданий.

Так, как постинг получился уже достаточно большим, то пример того, как можно "бороться" с этой ситуацией, а так же то, как обработать ситуацию "все молчат, говорю здесь я" (применительно к параллельной обработке задач) я покажу в следующий раз. :)

Tags:

10/17/2006 2:35:00 AM

Чтобы было что обрабатывать, давайте представим, что нам нужно проверить список URLов на предмет того, живы ли они (вроде, достаточно жизненно).
В этом примере сначала код, потом объяснения :)

//Вспомогательный метод.
//Возвращает набор портов, в первый из которых она
//положит успешный ответ сервера, а во второй - ошибку.
static PortSet<WebResponse, WebException> ProcessURL(string url)
{
    //Сначала создадим порты
    Port<WebResponse> responsePort = new Port<WebResponse>();
    Port<WebException> exceptionPort = new Port<WebException>();
 
    //сформируем запрос
    WebRequest request = HttpWebRequest.Create(url);
    request.Method = "HEAD";
    
    //Выполним запрос асинхронно.
    //Получателя ответа напишем здесь же в виде анонимного метода.
    request.BeginGetResponse(
        delegate(IAsyncResult result)
        {
            try
            {
                //пробуем получить ответ.
                //в случае успеха кладем его в соответствующий порт.
                WebResponse response = request.EndGetResponse(result);
                responsePort.Post(response);
            }
            catch (WebException ex)
            {
                //в случае ошибки кладем ее в порт ошибок.
                //пусть с ней разбирается тот, кто "знает"
                //что делать в такой ситуации.
                exceptionPort.Post(ex);
            }
        },
        null //state нам не нужен.
    );
 
    return new PortSet<WebResponse, WebException>(responsePort, exceptionPort);
}
 
static void Main(string[] args)
{
    //Список адресов, которые мы хотим проверить.
    string[] urls = new string[] {
        "http://alexey.raga.name",
        "http://error.error",
        "http://photos.raga.name"};
 
    //Создадим диспатчер и очередь, как в прошлом примере
    using (Dispatcher disp = new Dispatcher(3, "Main Pool"))
    {
        DispatcherQueue queue = new DispatcherQueue("Main Queue", disp);
 
        //Запустим проверку по всем адресам
        foreach (string url in urls)
        {
            //Правила для Арбитра:
            //ожидается либо ответ сервера, либо ошибка.
            Arbiter.Activate(queue,
                Arbiter.Choice<WebResponse, WebException>(
                    ProcessURL(url), //Наш вспомогательный метод вернет пару портов.
                    HandleResponse, //Вызвать в случае получения "нормального" ответа
                    HandleError //Вызвать в случае получения ошибки.
                )
            );
        }
    }
}
 
static void HandleResponse(WebResponse response)
{
    // ... обработка результата ...
}
 
static void HandleError(WebException exception)
{
    // ... обработка ошибки ...
}

Что мы тут видим.
Я сделал вспомогательный метод ProcessURL, в котором асинхронно выполняю запрос к веб-серверу. В качестве коллбека я передаю делегат, анонимный метод. В котором и помещаю результат (ответ сервера либо ошибку) в соответствующий порт.

Перед тем, как пойти дальше, объясню, почему я сразу не вызвал в ProcessURL методы HandleResponse и HandleError вместо того, чтобы класть результаты в порты.
Нет, не потому, что мне очень хотелось использовать CCR :)
Есть две другие, более веские причины.

  1. Методологическая. Вызов этих методов - не зона ответственности вспомогательного метода. Он - всего лишь хелпер, он умеет сделать запрос и вернуть результат его выполнения. В моем случае метод ProcessURL ничего не знает о внешней инфраструктуре, от нее не зависит, принимает известный ему параметр и возвращает результат, дальнейший путь обработки которого не в его компетенции.
  2. Техническая. Если я вызову в том анонимном методе эти обработчики, а в них еще другие функции, а в тех - еще, и т.д, то я получу своеобразную "спагетти", цепочку, в которой все действия выполняются внутри этого самого анонимного метода. Он не завершится до тех пор, пока не завершится вся цепочка.
    Зачем мне это? Легче положить результат в порт и выйти из метода. Пусть там потом с ним кому надо, тот и разбирается :) Получаем как бы нормальный последовательный вызов функций: одна закончилась - началась вторая.

Ну, разобрались со вспомогательным методом, теперь по существу: сам цикл.
В нем я задаю правило для Арбитра, но уже не с помощью метода Receive<..>, а с помощью Choiсe<..>.
Для начала нужно иметь в виду тот факт, что с помощью метода Receive<..> можно "подписаться" на данные из порта "на постоянной основе" (то есть - зарегистрировали один раз и получаем всегда), а вот с помощью Choiсe<..> - только на получение одной порции данных.
Поэтому я регистрирую правило в цикле, каждый раз. А набор портов - он тоже каждый раз разный, его возвращает метод ProcessURL.
Кроме того, Choice<..> подразумевает, что данные будут получены либо в одном порту, либо во втором. И будет вызван соответствующий обработчик. После чего Арбитр просто забудет о существовании этого набора портов.
Все это сделано как раз для реализации этого "либо". Мне остается только знать, что будет вызван или один обработчик, или другой. Или "норма" или "ошибка".

Итак, все же к обработке ошибок. Она становится проще, и вот почему:

  1. Мне не нужно принимать решение о том, что делать с ошибкой там же, где я ее перехватил. В случае, если в месте возникновения исключения я ничего не могу с ним поделать (а в большинстве случаев это так, например метод ProcessURL вообще не может "знать" как быть в такой ситуации), мне не нужно ни подавлять его, ни пропускать выше, чтобы кто-то его (может быть) перехватил, ни даже возбуждать собственное исключение. Мне нужно просто... Смотрим пункт 2 :)
  2. С использованием данного подхода я просто получаю две "ветки" своего алгоритма. Одна исполняется в случае "нормального" поведения, другая - в случае "ненормального" (это, кстати, совсем не обязательно должен быть exception). То есть, я просто как бы говорю: "если все ОК - то пойдем сюда, если нет - то пойдем туда".

Ошибка становится не чем-то неожиданным и из ряда вон выходящим, а нормальным и, главное, ожидаемым и обрабатываемым событием системы. Мне, как программисту, уже существенно проще работать с таким событием и существенно сложнее его проигнорировать и, таким образом, накосячить.
Вдумайтесь. Мне не надо здесь и сейчас решать, что делать с ошибкой. Именно необходимость (и невозможность) подобного решения "здесь и сейчас" в основном приводит к тому, что ошибку вообще не обрабатывают, а, в крайнем случае, регистрируют в логе. Осмысленно же обработать ошибку бывает делом ой каким непростым (я уже писал об этом на старом блоге).

Но не в этом случае, согласитесь? :) Здесь в момент принятия решения (вызов Arbiter.Choice<..>) я четко знаю, что делал и имею достаточно информации для того, чтобы решить "а что же делать, если вызов не получился". И моя ошибка будет спокойно обрабатываться себе опять же в отдельном, заметьте, потоке.
Все.

В следующий раз я не стану трепаться о всякой там методологии, а просто покажу несколько фичей, которые могут понадобиться (и понадобятся) при реальной работе с CCR.
В частности объясню, почему мои примеры "не совсем работоспособны" :) 

Tags:

10/16/2006 1:53:00 AM

Речь пойдет о библиотеке "Concurrency and Coordination Runtime", о которой я писал в предыдущем постинге.
Библиотека оказалась достаточно простой и удобной в использовании.

В качестве первого примера я покажу, как с помощью CCR "распараллелить" цикл: простая задача, которая очень часто может существенно увеличить показатели производительности приложения.

Естественно, в этом и других примерах, предполагается, что ссылка на Ccr.Core.dll в проекте присутствует и строки "using Microsoft.Ccr.Core;" в нужных местах добавлены :)

Пример несколько надуманный, но главное - идея.
Представим, что нам нужно обработать какую-нибудь статистику (например, финансовую) для группы пользователей. Предположим, что обработка статистики для одного пользователя занимает существенное время. Соответственно, нам уже не подходит вариант просто сделать это в цикле один за одним. Это слишком долго. Гораздо привлекательнее будет вариант, когда каждая итерация цикла выполняется параллельно и независимо друг от друга.

Это мы и будем делать.
Но для начала совсем-совсем кратенько об основных классах CCR, которые мы будем использовать:

  • Dispatcher. Этот класс представляет собой просто пул потоков. В .NET Framework есть "встроенный" пул ThreadPool. Будем считать в первом приближении, что Dispatcher - это то же самое. Только, в отличие от "встроенного" пула, который всегда один, мы можем иметь в своем приложении несколько Dispatcher'ов. Или не иметь вообще ни одного, тогда будет автоматически задействован "встроенный" ThreadPool.
  • DispatcherQueue. Это просто очередь. Очередь "привязывается" к пулу потоков ("встроенному" или созданному нами). Фактически, то же самое, что и очередь внутри ThreadPool, с тем исключением, что мы можем иметь несколько таких очередей. Отличие это очень важное, так как, например, для более важных, но редких сообщений мы можем создать собственную очередь и тогда они не "погрязнут" в толпе более частых, но менее важных сообщений.
  • Port<..>. Порт представляет собой некоторый шлюз (ну, порт) в котором передается параметр. Физически это тоже очередь. Звучит сложно, но примитивен в использовании. В примере будет видно.
  • Arbiter. Этот статический класс предназначен для выполнения роли "метродотеля" :) Он решает что, где и как должно обрабатываться. Нам нужно только помочь ему в этом - дать указания :)
Ну, а теперь пример:

//Функция, возвращающая пользователей для обработки.
static IEnumerable<User> GetUsersForUpdate()
{
    //Откуда-то вынимаем и возвращаем коллекцию пользователей.
    yield break;
}

//Здесь-то мы все и сделаем.
static void Main(string[] args)
{
    //Создаем пул потоков.
    //Могли бы и не создавать, тогда был бы использован ThreadPool.
    using (Dispatcher disp = new Dispatcher(2, "Main Pool"))
    {
        //Создаем очередь. Таких очередей может быть много, но пока хватит одной.
        //Очереди указываем, какой пул использовать.
        //Если бы не указали - был бы использован ThreadPool.
        DispatcherQueue queue = new DispatcherQueue("Main Queue", disp);

        //Порт. Туда будут "запихиваться" объекты для обработки.
        //Создать - вот, собственно, все, что нам от него нужно.
        Port<User> userPort = new Port<User>();

        //Задаем Арбитру правило для очереди "queue".
        //С помощью арбитра указываем среде, что все приходящие в "userPort"
        //экземпляры класса "User" будут обрабатываться методом "UpdateUserStatistics".
        Arbiter.Activate(queue,
            Arbiter.Receive<User>(true, userPort, UpdateUserStatistics)
        );

        //Осталось только отправить в порт те экземпляры "User",
        //которые необходимо обработать. И все.
        //Арбитр отправит их в соответсвии заданным правилам в "UpdateUserStatistics".

        foreach (User user in GetUsersForUpdate())
        {
            userPort.Post(user);
        }
    }
}

//Функция обработки пользователя.
static void UpdateUserStatistics(User user)
{
    //Здесь делаем что-то очень долгое
    user.Save();
}

Вот и все. Каждая функция "UpdateStatistics" будет вызываться в потоке. В каком, как - уже не наша забота. :) Важно, что они будут выполняться параллельно.

В "обычном", непараллельном варианте, мы бы просто вызывали функцию "UpdateStatistics", а тут мы заставляем это делать CCR. Код стал больше всего на несколько строк, читабельность и понимабельность его не пострадала.
Фактически мы имеем привычный подход с написанием и вызовом функций-обработчиков, но получаем огромную выгоду в виде паралеллизма.
Это как минимум.
В следующих постингах я расскажу о других функциональных и методологических выгодах :)

Тема следующего постинга: Обработка ошибок с использованием CCR. Тут уже не об усложнении, а об упрощении говорить придется :)

Tags:

Powered by BlogEngine.NET 1.6.0.0

About the author

Alexey Raga Alexey Raga
.NET software developer.

E-mail me Send mail

Twitter


Disclaimer

The opinions expressed herein are my own personal opinions and do not represent my employer's view in anyway.

© Copyright 2010

Sign in