Недавно обсуждали с коллегой достаточно, казалось бы, простую задачу: обрабатывая каждый элемент последовательности IObservable<T> нужно получить результаты из другого IObservable<U> и только потом переходить к следующему элементу.
Поясню на примере.
Допустим, у нас есть:
//здесь мы получаем номер
IObservable<int> numbers;
//функция, генерирующая набор простых чисел в диапазоне от 0 до value
public IObservable<int> GetPrimes(int value) {...}
И задача стоит простая: для каждого полученного в numbers числа “выдавать” набор простых чисел в диапазоне от 0 и до этого числа.
Задача, на самом деле, может быть любой: для каждого элемента обращаться к веб-сервису и получать результаты; для каждого имени файла подсчитать количество вхождений каждой буквы и выдать результат, и т.д.
Обобщаются все эти задачи одним: в процессе обработки элемента нужно “включить в результат” некую другую последовательность.
Казалось бы, чего проще! В LINQ эта задача давно решена с помощью SelectMany и решение выглядит следующим образом:
//для каждого элемента выдавать последовательность, возвращаемую
//функцией GetPrimes(...)
var result = numbers
.SelectMany(x=>GetPrimes(x));
И это будет работать! С одной оговоркой: это будет работать не совсем так, как нам нужно. Следующая марбл-диаграмма объясняет принцип работы SelectMany в Rx (на примере вышеуказанной конструкции):

Как видно, последовательность “numbers” генерирует некие значения (синие точки). Синие точки сами по себе не попадают на зелёную линию (результат) потому, что мы обрабатываем их с помощью .SelectMany, “производя” для каждого элемента новую последовательность с помощью метода GetPrimes(…). Последовательности изображены красной и жёлтой линиями. И вот уже их результаты попадают на зелёную линию Result. Но попадают они туда тоже асинхронно, в том же “темпе”, в котором функция GetPrimes их генерирует. То есть, результаты “путаются”, их порядок не гарантирован и не детерминирован.
Можно рассматривать эту ситуацию как если бы мы просто подписались на несколько независимых последовательностей (что в сущности и происходит).
Мы же хотим получить несколько иную картину: синяя точка + весь набор красных. Следующая синяя точка + весь набор жёлтых. И так далее.
Добиться этого можно с помощью преобразования “вложенной” последовательности в IEnumerable (благо библиотека Rx определяет методы для таких преобразований).
И тогда мы можем описать в общем виде функцию, позволяющую добиться искомого результата:
public static IObservable<U> ProduceMany<T, U>(this IObservable<T> source, Func<T, IObservable<U>> function)
{
return source
.SelectMany(x => function(x).ToEnumerable());
}
Сигнатура функции точно такая же, как и у SelectMany (монадный Bind, однако
), а воспользоваться ею теперь можно вот так:
using (numbers
.ProduceMany(x=>GetPrimes(x))
.Subscribe(Console.WriteLine))
{
Console.ReadKey();
}
Вот столько вот текста – и в конце функция на одну-две строчки.
Ну, просто хочется, чтобы было понятно… 