Процеси и OTP : Задачи
Последната тема (засега) за процеси. Задачите са процеси, които правят главно едно действие и не комуникират с други процеси, освен когато свършат работата си. Тогава пращат съобщение на този процес, който е причинил тяхното създаване.
Основната им цел е да превърнат прост sequential
код в паралелен.
Пример : паралелен Map версия 2
defmodule PEnum do
def map(enumerable, func) do
enumerable
|> Enum.map(&(Task.async(fn -> func.(&1) end)))
|> Enum.map(&Task.await/1)
end
end
Модулът Task
има две важни функции :
Task.async/1
(има иMFA
версия -Task.async/3
), която създава задача и яlink
-ва към текущия процес, като също така добавя монитор.Task.await/2
, която взима задача и опционалноtimeout
(по подразбиране 5000 - или пет секунди) и чака резултат от задачата.
С помощта на Task
можем да пишем прости background
задачи по лесен начин.
В примера за всеки елемент на подадената колекция извикваме подадената функция в процес-задача и чакаме по ред на създаване на задачите.
Структурата Task
Една задача представлява структура с три полета.
Task.async(fn -> :nothing end)
# %Task{owner: #PID<0.2724.0>, pid: #PID<0.2727.0>, ref: #Reference<0.0.3.551>}
Полетата представляват:
owner
- Процесът, който ще получи съобщение, когато задачата приключи.pid
-pid
-ът на процеса на задачата.ref
- Идва от мониторът прикрепен към задачата.
Функцията Task.await/2
извиква един receive
и използва тези три полета да изчака отговор.
Защо задачата е с link към процеса, който я създава?
Защото ако той ‘умре’, задачата няма къде да върне резултата си, затова трябва да бъде изчистена.
Ако пък задачата ‘умре’ значи нещо не е наред. Тя трябва да прави нещо наистина просто и не би трябвало да свърши живота си с причина, различна от :normal
.
Задача може да бъде стартирана със Task.start/1
или Task.start/3
, тогава няма да бъде link
-ната към текущия процес, но и няма да изпрати резултат от изпълнението си.
За повече информация и други функции свързани с Task
, вижте документацията.
Task.Supervisor
Понякога искаме задачите да изпращат резултат, но да не се link
-ват към текущия процес.
Да речем това са задачи, които си говорят с отдалечен компонент/service
и биха могли да получат грешка отвън.
Бихме могли да стартираме специален simple_one_for_one Supervisor
като част от нашия Application
, който да отговаря за тези задачи.
Този Supervisor
ще създава задачи, които могат и да излязат с грешка, но няма да убият процеса, който ги използва.
Даже ще връщат резултат ако са създадени с правилната функция.
Ако си спомняте, в Blogit
основният Supervisor
имаше следните спецификации:
children = [
supervisor(Blogit.Components.Supervisor, []),
supervisor(Task.Supervisor, [[name: :tasks_supervisor]]),
worker(Blogit.Server, [repository_provider])
]
Точно този :tasks_supervisor
се използва за създаване на задачи, които проверяват за промени в git repository
-то зададено на Blogit
.
Ако то не е достъпно поради някаква причина, те ще излязат с грешка, но това няма да убие Blogit.Server
процеса, който ги използва.
Всъщност Blogit.Server
процесът си създава по една такава задача на даден период от време така:
Task.Supervisor.async_nolink(
:tasks_supervisor, Blogit.Logic.Updater, :check_updates, [state]
)
По този начин с async_nolink
, нямаме link
, но имаме монитор, затова процесът, който го извиква ще получи съобщението си.
В Task.Supervisor
има други функции за създаване на задачи с и без връзка, за повече информация вижте документацията.
GenServer и Task
Интересна е ситуацията, когато създадем задача в GenServer
(или нещо базирано или подобно на него) с Task.Supervisor.async_nolink
.
Ако не използваме Task.await/2
, можем да си дефинираме handle_info callback
, който ще бъде извикан с резултата от задачата във формата {ref, result}
.
Това може да се използва от GenServer
процеси, които обслужват клиентски заявки и не трябва да губят време с код, който би могъл да отнеме време.
Кодът в процес е sequential
затова ако клиентски процес направи заявка към такъв GenServer
процес, а той се занимава с тежко изчисление, например, ще имаме timeout
.
Ако обаче такива тежки изчисления се правят в задачи, които по някое време ни ги върнат чрез handle_info callback
, променяйки състоянието на GenServer
процеса, той ще остане responsive
.
Тази стратегия е подходяща и за request
-и към отдалечени ресурси.
Именно затова я ползваме и в Blogit
. Проверката за промени с git
се прави с git fetch
, което обикновено е заявка към отдалечен сървър. Не трябва код, който управлява компонентите, използвани от потребителски процеси
да зависи от заявки към отдалечени ресурси.
Важното в подобни случаи е да дефинираме и още един, допълнителен handle_info callback
:
def handle_info({:DOWN, _ref, :process, _pid, _status}, state)
Това е важно, защото както и да завърши изпълнение всяка задача ще изпрати подобно съобщение
на GenServer
-а, все пак той я наблюдава с монитор.
Ако не дефинираме такъв callback
ще има грешка че не обработваме съобщение, което ни е изпратено, което би убило процеса.
Заключение
Тази кратка статия бележи края на поредицата свързана с OTP
поведенията и модулите около тях.
Има още. Следващите статии, обаче, са на тема мета-програмиране.
След тях е възможно да се върнем към OTP
и неговите бази данни, поведения и инструменти.