Процеси и OTP : Задачи

Последната тема (засега) за процеси. Задачите са процеси, които правят главно едно действие и не комуникират с други процеси, освен когато свършат работата си. Тогава пращат съобщение на този процес, който е причинил тяхното създаване.
Основната им цел е да превърнат прост sequential код в паралелен.
Пример : паралелен Map версия 2
defmodule PEnum do
def map(enumerable, func) do
enumerable
|> Enum.map(&(Task.async(fn -> func.(&1) end)))
|> Enum.map(&Task.await/1)
end
end
Модулът Task има две важни функции :
Task.async/1(има иMFAверсия -Task.async/3), която създава задача и яlink-ва към текущия процес, като също така добавя монитор.Task.await/2, която взима задача и опционалноtimeout(по подразбиране 5000 - или пет секунди) и чака резултат от задачата.
С помощта на Task можем да пишем прости background задачи по лесен начин.
В примера за всеки елемент на подадената колекция извикваме подадената функция в процес-задача и чакаме по ред на създаване на задачите.
Структурата Task
Една задача представлява структура с три полета.
Task.async(fn -> :nothing end)
# %Task{owner: #PID<0.2724.0>, pid: #PID<0.2727.0>, ref: #Reference<0.0.3.551>}
Полетата представляват:
owner- Процесът, който ще получи съобщение, когато задачата приключи.pid-pid-ът на процеса на задачата.ref- Идва от мониторът прикрепен към задачата.
Функцията Task.await/2 извиква един receive и използва тези три полета да изчака отговор.
Защо задачата е с link към процеса, който я създава?
Защото ако той ‘умре’, задачата няма къде да върне резултата си, затова трябва да бъде изчистена.
Ако пък задачата ‘умре’ значи нещо не е наред. Тя трябва да прави нещо наистина просто и не би трябвало да свърши живота си с причина, различна от :normal.
Задача може да бъде стартирана със Task.start/1 или Task.start/3, тогава няма да бъде link-ната към текущия процес, но и няма да изпрати резултат от изпълнението си.
За повече информация и други функции свързани с Task, вижте документацията.
Task.Supervisor
Понякога искаме задачите да изпращат резултат, но да не се link-ват към текущия процес.
Да речем това са задачи, които си говорят с отдалечен компонент/service и биха могли да получат грешка отвън.
Бихме могли да стартираме специален simple_one_for_one Supervisor като част от нашия Application, който да отговаря за тези задачи.
Този Supervisor ще създава задачи, които могат и да излязат с грешка, но няма да убият процеса, който ги използва.
Даже ще връщат резултат ако са създадени с правилната функция.
Ако си спомняте, в Blogit основният Supervisor имаше следните спецификации:
children = [
supervisor(Blogit.Components.Supervisor, []),
supervisor(Task.Supervisor, [[name: :tasks_supervisor]]),
worker(Blogit.Server, [repository_provider])
]
Точно този :tasks_supervisor се използва за създаване на задачи, които проверяват за промени в git repository-то зададено на Blogit.
Ако то не е достъпно поради някаква причина, те ще излязат с грешка, но това няма да убие Blogit.Server процеса, който ги използва.
Всъщност Blogit.Server процесът си създава по една такава задача на даден период от време така:
Task.Supervisor.async_nolink(
:tasks_supervisor, Blogit.Logic.Updater, :check_updates, [state]
)
По този начин с async_nolink, нямаме link, но имаме монитор, затова процесът, който го извиква ще получи съобщението си.
В Task.Supervisor има други функции за създаване на задачи с и без връзка, за повече информация вижте документацията.
GenServer и Task
Интересна е ситуацията, когато създадем задача в GenServer (или нещо базирано или подобно на него) с Task.Supervisor.async_nolink.
Ако не използваме Task.await/2, можем да си дефинираме handle_info callback, който ще бъде извикан с резултата от задачата във формата {ref, result}.
Това може да се използва от GenServer процеси, които обслужват клиентски заявки и не трябва да губят време с код, който би могъл да отнеме време.
Кодът в процес е sequential затова ако клиентски процес направи заявка към такъв GenServer процес, а той се занимава с тежко изчисление, например, ще имаме timeout.
Ако обаче такива тежки изчисления се правят в задачи, които по някое време ни ги върнат чрез handle_info callback, променяйки състоянието на GenServer процеса, той ще остане responsive.
Тази стратегия е подходяща и за request-и към отдалечени ресурси.
Именно затова я ползваме и в Blogit. Проверката за промени с git се прави с git fetch, което обикновено е заявка към отдалечен сървър. Не трябва код, който управлява компонентите, използвани от потребителски процеси
да зависи от заявки към отдалечени ресурси.
Важното в подобни случаи е да дефинираме и още един, допълнителен handle_info callback:
def handle_info({:DOWN, _ref, :process, _pid, _status}, state)
Това е важно, защото както и да завърши изпълнение всяка задача ще изпрати подобно съобщение
на GenServer-а, все пак той я наблюдава с монитор.
Ако не дефинираме такъв callback ще има грешка че не обработваме съобщение, което ни е изпратено, което би убило процеса.
Заключение
Тази кратка статия бележи края на поредицата свързана с OTP поведенията и модулите около тях.
Има още. Следващите статии, обаче, са на тема мета-програмиране.
След тях е възможно да се върнем към OTP и неговите бази данни, поведения и инструменти.