Модулите Enum и Stream


В един функционален език кодът е поредица от трансформации на стойности. Една функция бива извиквана върху резултата от друга и така докато не се получи желаният краен резултат.

Когато работим с колекции от данни, често искаме дадена функция да се приложи на всички елементи на колекция и да се получи нова колекция, или пък да се филтрират някои елементи на колекция, или пък да се получи резултат базиран на всички или някои от елементите на колекцията. Функции като map, filter, reduce правят това. Модулът Enum съдържа имплементации на такива функции, които не оперират само над списъци, но и над други колекции, да речем речници.

Когато искаме да имаме подобни функции, но те да се изпълняват само, когато резултатът им трябва да бъде прочетен, тоест да се смятат мързеливо (lazy), бихме превърнали колекциите в lazy версиите им - потоци. Модулът Stream ни предоставя такава функционалност и lazy версии на някои от функциите в Enum.

В тази публикация ще разгледаме тези два модула.

Enum

Както казахме, в стандартната библиотека на Elixir има модул, който е предназначен за работа с колекции (типове, които могат да се обхождат и по-формално казано: имплементират Enumerable протокола). Това ще е един от модулите, които ще използвате най-много, така че можете да разгледате неговата документация.

Колекциите, които идват с езика и могат да ползват функциите на Enum са MapSet, Function, Range, Stream, IO.Stream, Date.Range, HashSet, File.Stream, GenEvent.Stream, List, Map и HashDict. Някои от тях са deprecated. Когато говорим за протоколи, ще ви покажем как да направите свой тип Enumerable. Така ще можете да ползвате функциите дефинирани в Stream и Enum. Важните типове за които можем да ползваме Enum и Stream са MapSet (множество), Range и Date.Range (поредици), Stream, IO.Stream и File.Stream (потоци от данни), List (списък) и Map (речник).

В публикацията за списъци дефинирахме функциите reduce, map и filter, които са достъпни в модула Enum. В модула List има foldr и foldl, които са reduce, оптимизиран за списъци. Функциите в Enum работят над произволна колекция, която имплементира протокола Enumerable, затова обхващат по-широк спектър от типове. Всички функции в този модул се базират над имплементация на reduce за дадения тип.

В тази публикация, ние няма да разглеждаме функциите в Enum. Тези функции са добре документирани в документацията на модула. Това, което ще направим е да анализираме тези функции.

Какво е енумерация? Това е някакъв тип, който съдържа стойности от други типове, които биха могли да се обходят една по една в даден ред. При списъка - знаем как да обходим елементите му: взимаме head елемента, след това обхождаме по същия начин tail списъка:

defmodule ListUtils do
  def print_all([]), do: :ok
  def print_all([head | tail]) do
    IO.puts(head)
    print_all(tail)
  end
end

ListUtils.print_all([1, 3, 5])
#output: 1
#output: 3
#output: 5
#=> :ok

Можем да обходим и речник, като вземем ключовете му и ги използваме, за да прочетем елементите му:

defmodule MapUtils do
  def print_all(map) when is_map(map) do
    keys = Map.keys(map)

    print(keys, map)
  end

  defp print([], _), do: :ok
  defp print([key | rest], map) do
    IO.puts("#{key} : #{Map.get(map, key)}")
    print(rest, map)
  end
end

MapUtils.print_all(%{a: 3, b: 2})
#output: a : 3
#output: b : 2
#=> :ok

По подобен начин можем да обходим и поредица и множество и други колекции, на които знаем как да вземем първия елемент и да обходим рекурсивно останалите. Горната print_all/1 функция обхожда колекция и извежда на стандартния изход всеки елемент. Можем да напишем подобна функция, която вдига на квадрат стойностите в колекция от числа. Можем да напишем и така, която за всеки елемент в колекция от низове, връща елемент, представляващ същия стринг, но само с главни букви. Функцията map обобщава това действие, като взима колекция, която знаем как да обходим елемент по елемент (буквално това е енумерация) и прилага функция върху всеки елемент, връщайки списък от трансформирани елементи. Както знаем от публикацията за списъци map е частен случай на reduce. Има много полезни частни случаи на reduce и точно това са функциите в Enum.

Друго важно свойство на Enum е, че функциите очакват eager колекции. Когато извикаме Enum.map(1..10, action), поредицата 1..10 ще бъде обходена веднага и действието action ще бъде приложено на всеки елемент. Нека разгледаме следния пример:

1..20
|> Enum.map(&Kernel.+(&1, 1))
|> Enum.filter(&Integer.is_odd/1)
|> Enum.map(&Kernel.*(&1, 2))
|> Enum.shuffle()
|> Enum.take_random(4)
#=> [26, 42, 10, 22] # Примерен резултат

На всеки етап дадената енумерация ще бъде обходена. Това значи, че ще обхождаме колекция 5 пъти. Запомнете това. Функциите в модула Stream не се държат така, вместо на всяка стъпка да се обхожда колекция и да се прилага операция, на всяка стъпка се строи една комплексна операция, която се прилага върху оригиналната колекция при поискване. Но за потоци и за модула Stream ще си поговорим след малко.

Да обобщим: модулът Enum съдържа функции, които могат да се прилагат върху колекции, чиито елементи могат да се обхождат един по един. Тези функции са абстракции на полезни частни случаи и се базират върху операцията reduce. Няма да навлизаме в подробности сега, но за всяка колекция, която бива използвана от другите функции в Enum има имплементирана версия на reduce, която тези функции ползват за основа. Операциите в Enum са eager: изпълняват се веднага върху подадената им енумерация и връщат нова колекция (най-често списък) или стойност. Всяка такава операция обикновено е линейна и представлява едно обхождане на колекцията елемент по елемент с прилагане на действие върху всеки елемент.

Отново ви приканваме да се запознаете с всички функции в Enum чрез чудесната документация на модула.

Сега ще разгледаме нещо интересно. Това, че има for в Elixir!

Comprehensions

Elixir предоставя съкратен синтаксис за извършване на map и filter върху колекция, тъй като много често се налага да се използват тези операции. Например ако искаме да повдигнем елементите на една колекция на квадрат и да филтрираме резултата можем да направим следното:

[1,2,3,4,5]
|> Enum.filter(fn x -> x < 4 end)
|> Enum.map(fn x -> x * x end)
#=> [1, 4, 9]

Горният пример може да се имплементира така:

for x <- [1, 2, 3, 4, 5], x < 4, do: x * x
#=> [1, 4, 9]

Друго преимущество на for израза, е че може да филтрира с match-ване. Да речем, ако искаме да филтрираме само елементи, които match-ват дадена структура ще напишем нещо такова:

list = [{:ok, 1}, {:error, 2}, {:ok, 3}, {:error, 4}]
#=> [ok: 1, error: 2, ok: 3, error: 4]

list
|> Enum.filter(fn x -> Kernel.match?({:ok, _}, x) end)
|> Enum.map(fn {:ok, x} -> x end)
#=> [1, 3]

Тук искаме да филтрираме само елементи от вида {:ok, _}. Трябва да ползваме Enum.filter/2 и фунцкията Kernel.match?/2, която проверява дали аргументите ѝ се match-ват. С comprehension това ще изглежда така:

list = [{:ok, 1}, {:error, 2}, {:ok, 3}, {:error, 4}]
#=> [ok: 1, error: 2, ok: 3, error: 4]

for {:ok, x} <- list, do: x
#=> [1, 3]

Можем да имаме по няколко генератора за един comprehension:

for x <- [1, 2], y <- [3, 4], do: {x, y}
#=> [{1, 3}, {1, 4}, {2, 3}, {2, 4}]

Блокът в дясно ще се изпълни за всяка една двойка или формално казано - с Декартовото произведение на двете колекции.

В примерите до сега comprehension-ите връщат колекцията след като се изпълни филтъра и после map-а. Това може да се промени със параметъра into, където можем да подадем нещо което имплементира Collectable протокола (за който ще говорим скоро по-подробно), в което ще се съхрани резултата от comprehension-а. Например можем да изградим речник:

for x <- ~w{cat dog elephant mammut}, into: %{}, do: {x, String.length(x)}
#=> %{"cat" => 3, "dog" => 3, "elephant" => 8, "mammut" => 6}

Можем да подадем съществуваща структура и тя ще бъде допълнена:

for x <- ~w{cat dog elephant mammut}, into: %{"fish" => 4}, do: {x, String.length(x)}
#=> %{"cat" => 3, "dog" => 3, "elephant" => 8, "fish" => 4, "mammut" => 6}

Потоци и модула Stream

Корекурсия

Структурите във функционалните езици са често рекурсивни. Обикновено са и persistent и непроменими. Пример е списъкът. Тази структура е дефинирана рекурсивно : списък е двойка от глава, която държи стойност и опашка, която е списък. Друга рекурсивна структура е Hash Array Mapped Trie структурата, над която е имплементиран Мap-a. Логично е рекурсивни структури да се обхождат и изграждат чрез рекурсивни функции. Тези рекурсивни функции взимат цялата структура и използвайки начинът, по който е изградена, я раздробяват и опростяват, докато не стигнат до най-простото и състояние - да речем при списъците, това е празния списък.

В математиката много операции имат двойнствена (dual) операция, която обикновено наричаме ко-операция. Рекурсията има двойнствена операция, наречена корекурсия. Ако рекурсията ни позволява да оперираме над сложна структура, опростявайки я и намирайки дъно на рекурсията, то корекурсията ни позволява да ползваме проста начална стойност като база за построяването на сложна структура, която може да е даже безкрайна.

Нека първо разгледаме нещо много познато - факториела. Дефинираме функция за факториел рекурсивно:

defmodule Factorial do
  def of(0), do: 1 # Дъно на рекурсията, най-прост случай
  def of(n) when n > 0, do: n * of(n - 1) # Тяло на рекурсията
end

Смятаме факториел N! чрез функция, която използва копие на себе си с по-прост случай ((N-1)!), и така докато стигнем най-простия случай 0!. За да се сметне 5!, ще трябва да се сметне 4!. За да се сметне 4!, ще трябва да се сметне 3!. И така накрая ще стигнем до 0!, което е просто 1, ще можем да се върнем назад, смятайки 1! чрез него и така до 5!. Симулирането на това връщане назад от базовия случай - 0! е точно корекурсията. Рекурсията тръгва от сложния случай 5!, за да стигне до простия - 0!. Корекурсията тръгва от простия 0! и може да изгради безкрайно много сложни случаи базирани на него. Поток от факториели!

Функции, които корекурсивно продуцират потоци (streams) се наричат генератори. Нека да напишем генератор на факториели:

defmodule Factorial do
  defstruct value: 1, n: 0

  def new do
    %__MODULE__{}
  end

  def next(%__MODULE__{value: v, n: n}) do
    %__MODULE__{value: v * (n + 1), n: n + 1}
  end

  def of(n) do
    new()
    |> of(n)
  end

  defp of(%__MODULE__{n: n, value: v}, n), do: v
  defp of(%__MODULE__{n: n} = state, k) when n < k do
    state
    |> next()
    |> of(k)
  end
end


factorial_of_5 =
  Factorial.new()
  |> Factorial.next()
  |> Factorial.next()
  |> Factorial.next()
  |> Factorial.next()
  |> Factorial.next()
  |> Map.get(:value)
#=> 120

# Или горното, но автоматизирано с рекурсия:
Factorial.of(5)
#=> 120

Ако дефинираме функция, която корекурсивно построява поток от стойности, можем да използваме рекурсия за да обходим тези стойности от най-базовата, до тази, която ни интересува.

В модула Stream има функции, които ни позволяват да си построим потоци, използвайки начална стойност или състояние и функции, които ни позволяват да си построим операции върху тези потоци. Интересен пример е функцията Stream.unfold/2, която можем да използваме, за да си построим факториел:

defmodule Factorial do
  def of(n) do
    bottom = n + 1

    Stream.unfold(
      {0, 1},
      fn
        {^bottom, _} -> nil
        {k, v} -> {{k, v}, {k + 1, v * (k + 1)}}
      end
    )
    |> Enum.to_list()
    |> List.last()
    |> Kernel.elem(1)
  end
end

Това прилича на имплементацията ни с next. Строим си поток с начална стойност {0, 1} или 0 за кой подред факториел имаме и 1 за стойността му. Вторият параметър на unfold е функция, която ако върне nil, потокът трябва да спре да създава нови стойности. Ако върне двойка с първи елемент текуща стойност или в нашия случай, {k, v} и втори елемент следваща стойност, съответно {k + 1, v * (k + 1)}, имаме нова стойност в потока. Правим си списък от потока, взимаме последната стойност на списъка и нейният втори елемент, което е стойността за дадения факториел. Има и по-умен начин да вземем тази стойност а и потока може да е безкраен. Можем да построим потока и на обратно. Важното е да видим, че един поток се строи от начална стойност и операция, която може да даде следващата стойност, използвайки текущата.

Вече имаме идея какво представляват потоците. Нека сега разгледаме модула Stream. Ще го направим по-подробно от Enum, поради това, че приемаме, че потоците са нови за нас, като идея.

Модула Stream

Ако погледнете документацията на модула Stream, ще видите, че той съдържа много подобни функции като модула Enum. От видяното по-горе и това, което знаем за енумерациите, можем да мислим за потоците като за енумерации, които се генерират при поискване (така наречените lazy enumerables). Например, ако изпълним този код:

1..10_000_000
|> Enum.map(&(&1 + 1))
|> Enum.take(5)
#=> [2, 3, 4, 5, 6]

Той ще отнеме няколко секунди да се изпълни, тъй като първо ще се изчисли целия Enum.map върху 10 милиона числа и после ще се вземат първите 5 от тях. Много по-ефективна имплементация би била:

1..10_000_000
|> Stream.map(&(&1 + 1))
|> Enum.take(5)
#=> [2, 3, 4, 5, 6]

Ако тествате горния пример, ще видите, че той се изпълнява веднага. Това е защото Stream.map/2 няма да обработи цялата колекция от 10 милиона числа, а ще върне “поток”, от който можем да вземаме числа и той ще извършва сметките когато поискаме число. Това което прави потоците изключително полезни, е че можем да ги композираме и така да описваме последователност от операции, като те ще се изпълнят така, че да не зареждаме цялата колекция в паметта, а обработваме елементите един по един при поискване.

Друг пример е при работата с файлове. Ако например искаме да намерим най-дългия ред в един файл, един от начините е:

File.read!("binaries.md") # Прочитаме файла
|> String.split("\n") # Превръщаме го във списък от редове
|> Enum.max_by(&String.length/1) # Намираме най-дългият ред

Това би било много неефективно ако файлът е голям, защото първо ще прочете целият файл в паметта, после ще генерира много дълъг списък от редовете във файла. По-ефективно би било да се ползват потоци:

File.open!("binaries.md", [:utf8]) # Отваряме файла
|> IO.stream(:line) # Създаваме поток от редовете му
|> Enum.max_by(&String.length/1) # Намираме най-дългият ред

Така файлът ще се обработва ред по ред, вместо да се чете целият в паметта. За горния пример има и съкратена версия:

File.stream!("binaries.md")
|> Enum.max_by(&String.length/1)

Ето един друг пример, където искаме да преброим броят думи в голям файл, в случая английският превод на Война и Мир, който е 3.2МБ текстов файл:

defmodule WarAndPiece do
  def number_of_words_with_enum do
    # Можете да свалите файла със:
    # curl http://www.gutenberg.org/files/2600/2600-0.txt > war_and_piece.txt
    File.read!("war_and_piece.txt")
    |> String.split
    |> length
  end

  def number_of_words_with_stream do
    File.stream!("war_and_piece.txt")
    |> Stream.flat_map(&String.split/1)
    |> Enum.reduce(0, fn _, acc -> acc + 1 end)
  end
end

:timer.tc(WarAndPiece, :number_of_words_with_enum, [])
#=> {1561933, 566309}
:timer.tc(WarAndPiece, :number_of_words_with_stream, [])
#=> {833594, 566309}

Както виждате, успяваме да свалим времето за изпълнение на половина само чрез използването на поток, вместо да четем и обработваме всички данни в паметта. Потоците са полезен инструмент, който трябва да можем да използваме. В повечето случаи можем да обработваме данните си като колекции, но има случаи, където потоците са по-удачни.

Безкрайни потоци

Нещо, което е уникално за потоците, поради тяхната корекурсивна природа, е че те могат да са безкрайни. Например можем да генерираме безкрайна редица от единици:

Stream.cycle([1])
|> Enum.take(10)
#=> [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

Както видяхме от примера с факториел, можем да използваме функцията Stream.unfold/2, за да генерираме поток, в който всяка следваща стойност зависи от предната. Ето как можем да направим поток, който генерира безкрайната редица на Фибоначи:

Stream.unfold({0, 1}, fn {a, b} -> {a, {b, a + b}} end) |> Enum.take(10)
#=> [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]

Функцията Stream.cycle/1 може да е много полезна ако генерирате HTML и искате да направите редове на таблица, които да имат алтерниращи класове:

defmodule RenderTable do
  def render(list) do
    [
      "<table>\n",
      "  <tr>\n",
      "    <th>Items</th>\n",
      "  </tr>\n",
      render_list(list),
      "</table>"
    ]
  end

  defp render_list(list) do
    Stream.cycle(["odd", "even"])
    |> Stream.zip(list)
    |> Stream.map(&render_item/1)
    |> Enum.to_list()
  end

  defp render_item({class, item}) do
    [
      "<tr>\n",
      "  <td class=\"", class, "\">\n",
      "    ", item, "\n",
      "  </td>\n",
      "</tr>\n"
    ]
  end
end

IO.puts RenderTable.render(~w/milk butter bread/)
# <table>
#   <tr>
#     <th>Items</th>
#   </tr>
# <tr>
#   <td class="odd">
#     milk
#   </td>
# </tr>
# <tr>
#   <td class="even">
#     butter
#   </td>
# </tr>
# <tr>
#   <td class="odd">
#     bread
#   </td>
# </tr>
# </table>

Обърнете внимание как вместо да правим интерполация на низовете, генерираме списъци от низове. Това намалява копирането на памет, което виртуалната машина трябва да направи и прави генерирането на HTML-а ни по-бързо. Това е широко използвана техника във Phoenix Framework и дори има специално име: IOList. Интересна подробност, е че IOList може да съдържа и вложени списъци и няма нужда да се прави flatten ако рекурсивно генерираме данните, които трябва да отпечатаме или изпратим по сокет. Повече информация можете да прочетете тук. Друга интересна статия по темите IOList, потоци и ефективност (има сравнение с Ruby), може да прочетете тук. В бъдеща публикация, свързана с IO, ще се върнем по-подробно на тази тема.

Друго интересно нещо е, че използваме версии на функциите zip и map от модула Stream, които са lazy. Това означава, че тези операции се обединяват в една операция, която ще се приложи само веднъж, когато потокът бъде усвоен. Усвояването на поток е първото му eager четене, да речем Enum.to_list/1 ще трябва да го изчете, за да си построи списъка и е eager функция, трябва да върне списък при извикване. Ако бяхме ползвали Enum.map/2 тук, вместо Stream.map/2, потокът щеше да бъде усвоен и нов списък от map-натите елементи на потока щеше да бъде създаден.

Безкрайните потоци могат да се използват и за други неща, като например да дефинираме ресурси с тях. Ресурсите имат 3 фази:

  • Инициализация
  • Генериране
  • Почистване

Така например ако искаме да прочетем един файл, то трябва да го отворим, да прочетем всички данни от него и накрая да го затворим. Разгледайте документацията на Stream.resource/3 за повече подробности и пример: https://hexdocs.pm/elixir/Stream.html#resource/3.