Процеси и OTP : Задачи


Последната тема (засега) за процеси. Задачите са процеси, които правят главно едно действие и не комуникират с други процеси, освен когато свършат работата си. Тогава пращат съобщение на този процес, който е причинил тяхното създаване.

Основната им цел е да превърнат прост sequential код в паралелен.

Пример : паралелен Map версия 2

defmodule PEnum do
  def map(enumerable, func) do
    enumerable
    |> Enum.map(&(Task.async(fn -> func.(&1) end)))
    |> Enum.map(&Task.await/1)
  end
end

Модулът Task има две важни функции :

  1. Task.async/1 (има и MFA версия - Task.async/3), която създава задача и я link-ва към текущия процес, като също така добавя монитор.
  2. Task.await/2, която взима задача и опционално timeout (по подразбиране 5000 - или пет секунди) и чака резултат от задачата.

С помощта на Task можем да пишем прости background задачи по лесен начин.

В примера за всеки елемент на подадената колекция извикваме подадената функция в процес-задача и чакаме по ред на създаване на задачите.

Структурата Task

Една задача представлява структура с три полета.

Task.async(fn -> :nothing end)
# %Task{owner: #PID<0.2724.0>, pid: #PID<0.2727.0>, ref: #Reference<0.0.3.551>}

Полетата представляват:

  • owner - Процесът, който ще получи съобщение, когато задачата приключи.
  • pid - pid-ът на процеса на задачата.
  • ref - Идва от мониторът прикрепен към задачата.

Функцията Task.await/2 извиква един receive и използва тези три полета да изчака отговор.

Защо задачата е с link към процеса, който я създава?

Защото ако той ‘умре’, задачата няма къде да върне резултата си, затова трябва да бъде изчистена. Ако пък задачата ‘умре’ значи нещо не е наред. Тя трябва да прави нещо наистина просто и не би трябвало да свърши живота си с причина, различна от :normal.

Задача може да бъде стартирана със Task.start/1 или Task.start/3, тогава няма да бъде link-ната към текущия процес, но и няма да изпрати резултат от изпълнението си.

За повече информация и други функции свързани с Task, вижте документацията.

Task.Supervisor

Понякога искаме задачите да изпращат резултат, но да не се link-ват към текущия процес. Да речем това са задачи, които си говорят с отдалечен компонент/service и биха могли да получат грешка отвън.

Бихме могли да стартираме специален simple_one_for_one Supervisor като част от нашия Application, който да отговаря за тези задачи. Този Supervisor ще създава задачи, които могат и да излязат с грешка, но няма да убият процеса, който ги използва. Даже ще връщат резултат ако са създадени с правилната функция.

Ако си спомняте, в Blogit основният Supervisor имаше следните спецификации:

children = [
  supervisor(Blogit.Components.Supervisor, []),
  supervisor(Task.Supervisor, [[name: :tasks_supervisor]]),
  worker(Blogit.Server, [repository_provider])
]

Точно този :tasks_supervisor се използва за създаване на задачи, които проверяват за промени в git repository-то зададено на Blogit. Ако то не е достъпно поради някаква причина, те ще излязат с грешка, но това няма да убие Blogit.Server процеса, който ги използва.

Всъщност Blogit.Server процесът си създава по една такава задача на даден период от време така:

Task.Supervisor.async_nolink(
  :tasks_supervisor, Blogit.Logic.Updater, :check_updates, [state]
)

По този начин с async_nolink, нямаме link, но имаме монитор, затова процесът, който го извиква ще получи съобщението си.

В Task.Supervisor има други функции за създаване на задачи с и без връзка, за повече информация вижте документацията.

GenServer и Task

Интересна е ситуацията, когато създадем задача в GenServer (или нещо базирано или подобно на него) с Task.Supervisor.async_nolink. Ако не използваме Task.await/2, можем да си дефинираме handle_info callback, който ще бъде извикан с резултата от задачата във формата {ref, result}.

Това може да се използва от GenServer процеси, които обслужват клиентски заявки и не трябва да губят време с код, който би могъл да отнеме време. Кодът в процес е sequential затова ако клиентски процес направи заявка към такъв GenServer процес, а той се занимава с тежко изчисление, например, ще имаме timeout. Ако обаче такива тежки изчисления се правят в задачи, които по някое време ни ги върнат чрез handle_info callback, променяйки състоянието на GenServer процеса, той ще остане responsive. Тази стратегия е подходяща и за request-и към отдалечени ресурси.

Именно затова я ползваме и в Blogit. Проверката за промени с git се прави с git fetch, което обикновено е заявка към отдалечен сървър. Не трябва код, който управлява компонентите, използвани от потребителски процеси да зависи от заявки към отдалечени ресурси.

Важното в подобни случаи е да дефинираме и още един, допълнителен handle_info callback:

def handle_info({:DOWN, _ref, :process, _pid, _status}, state)

Това е важно, защото както и да завърши изпълнение всяка задача ще изпрати подобно съобщение на GenServer-а, все пак той я наблюдава с монитор. Ако не дефинираме такъв callback ще има грешка че не обработваме съобщение, което ни е изпратено, което би убило процеса.

Заключение

Тази кратка статия бележи края на поредицата свързана с OTP поведенията и модулите около тях. Има още. Следващите статии, обаче, са на тема мета-програмиране. След тях е възможно да се върнем към OTP и неговите бази данни, поведения и инструменти.