Процеси и OTP : GenServer
Какво е OTP
?
Всъщност е много неща. От една страна OTP
е платформата с която
се разпространява Erlang
. Версиите на Erlang
са версии на OTP
. Когато си
пуснем iex
, виждаме нещо такова : Erlang/OTP 19
. Тоест Erlang
и OTP
са като едно цяло.
OTP
е и стандартната библиотека на Erlang
.
Съкращението идва от Open Telecom Platform, но с времето OTP
се е превърнало в нещо много по-голямо от платформа за писане на телекомуникационни програми.
В момента изискванията за web
разработка са много подобни на изискванията за писане на телекомуникационни програми.
И OTP
е решение.
Платформата OTP
идва със следните неща:
- Интерпретатор и компилатор на
Erlang
. - Стандартните библиотеки на
Erlang
. Dialyzer
, за който говорихме в Тип-спецификации и поведения.Mnesia
- дистрибутирана база данни.ETS
- база данни в паметта.- Дебъгер.
- И много други…
Това, което ще разгледаме в тази и следващата статии са три абстракции, предоставени ни от OTP
- Gen
сървъри, Supervisor
-и и OTP application
-и.
Какво е GenServer?
В статията Тип-спецификации и поведения си
говорихме за поведения. Дефинирахме поведение, наречен SynchronousCallHandler
.
Това поведение описваше процес, който държи състояние и предоставя начин за синхронно модифициране или четене на това състояние.
Оказва се че такова поведения, както и други, подобни са често срещано изискване
при работа с Erlang
. Именно затова OTP
идва с поведение наречено gen_server
.
Elixir
, от своя страна го адаптира като поведението GenServer
.
GenServer
представлява процес, представен от модул, който предлага функции за различни,
често срещани случаи при работа с процеси. Функции за синхронна и/или асинхронна комуникация,
функции, които се извикват при създаване на процес или преди унищожаването му,
функции за нотифициране при грешки и за логване.
Когато комбинираме GenServer
-и със Supervisor
процеси лесно можем да създадем fault-tolerant
система.
С помощта на мета-програмиране GenServer
е специален тип поведение - поведение с функции, които имат имплементации по подразбиране.
За да направим от един модул GenServer
, използваме use GenServer
. Ще си говорим по-подробно за use
, когато си говорим за мета-програмиране.
defmodule MyWorker do
use GenServer
end
По този начин декларираме, че даден модул ще изпълни поведението GenServer
.
Това поведение и логиката около него ни дават следните възможности:
- Да стартираме процес.
- Да поддържаме състояние в този процес.
- Да получаваме
request
-и и да връщаме отговори. - Да спираме процеса.
Нека започнем с пример и след това ще разгледаме самото поведение.
Пример : GenServer и Blogit - Posts component
Backend-ът на блога, който четете е пример за използване на OTP
логика.
Примерът е Blogit.Components.Posts
, който е GenServer
, чието състояние са всички блог-постове.
Blogit
е OTP application
, който от дадено git repository
с markdown
файлове
създава блог с постове със съдържание, съдържанието на тези markdown
файлове
като HTML
.
Един Post
представлява следната структура:
defmodule Blogit.Models.Post do
alias Blogit.Models.Post.Meta
@type t :: %__MODULE__{
name: String.t, raw: String.t, html: String.t, meta: Meta.t
}
@enforce_keys [:name, :raw, :html, :meta]
defstruct [:name, :raw, :html, :meta]
end
Структурата държи име на поста, което е уникален идентификатор, суровото markdown
съдържание, html
съдържанието и мета-информация.
Искаме да имаме компонент, който да държи всички постове от дадено git repostiry
.
Както и да можем да правим заявки към него като ‘дай пост по уникално име’,
‘дай всички постове сортирани по дата на създаване’,
‘осъвремени си постовете’ и други.
За това, разбира се, можем да ползваме Agent
, но GenServer
ни дава повече свобода
и по лесен начин за правене на каквито и да е заявки към дадено състояние.
Всъщност Agent
е имплементация на GenServer
.
Нека разгледаме Blogit.Components.Posts
:
defmodule Blogit.Components.Posts do
use GenServer
alias Blogit.Models.Post
# Client functions
def start_link() do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
# Server callbacks
def init(_) do
send(self(), :init_posts)
{:ok, nil}
end
def handle_info(:init_posts, nil) do
posts = GenServer.call(Blogit.Server, :get_posts)
{:noreply, posts}
end
def handle_cast({:update, new_posts}, _) do
{:noreply, new_posts}
end
def handle_call({:list, from, size}, _from, posts) do
result = Map.values(posts)
|> Post.sorted |> Enum.drop(from) |> Enum.take(size)
{:reply, result, posts}
end
def handle_call({:by_name, name}, _from, posts) do
case post = posts[name] do
nil -> {:reply, :error, posts}
_ -> {:reply, post, posts}
end
end
end
Това е скъсена имплементация, но можем да видим различни функции от GenServer
и как се ползват те.
Можем да разделим функциите на две категории:
- Такива, които се извикват от процес-клиент.
- Такива, които имплементират
GenServer
поведението.
Обикновено функцията за стартиране на GenServer
процес се нарича start_link
.
В случаят по-горе се стартира GenServer
имплементиран от текущия модул. Даваме му име - модула.
По този начин ще можем да го адресираме така:
GenServer.call(Blogit.Components.Posts, {:list, 0, 5})
Това ще изпрати съобщение, което ще се обработи от функцията, дефинирана като
handle_call({:list, from, size}, _from, posts)
.
Когато се стартира GenServer
, ако сме му дефинирали init/1
функция, се извиква тя.
Една такава функция може да преустанови изпълнението на процеса преди още да е започнал работа.
Скоро ще видим какво може да връща тя. Засега е важно да знаем, че обикновено тя връща {:ok, <състояние>}
.
Можем да приемем init/1
като конструктор на GenServer
процеса.
В горния случай сме избрали да отложим инициализацията за малко по-късно.
Това е специфично за Blogit
програмата и нейното устройство, но ако състоянието на един GenServer
процес
зависи от друг процес, това е начинът. Процесът ни трябва да е активен и готов да приема съобщения, което се случва
точно след като init
върне {:ok, <състояние>}
.
Тъй като GenServer
е просто процес, можем да му пращаме обикновени съобщения без особена структура със send
.
Което и правим. Такива съобщения се обработват с handle_info
функции.
Нашата handle_info(:init_posts, nil)
, приема два аргумента съобщение и текущо състояние.
Всяка от handle_*
функциите може да промени състоянието на GenServer
процеса, като го върне като последен елемент
на кортеж-резултата си. В нашия случай, взимаме постовете от друг GenServer
процес, за който ще говорим по-късно,
използвайки GenServer.call/2
. Тази функция изпраща съобщения синхронно - тоест чака за отговор.
За handle_info
е нормално да върне {:noreply, <ново–състояни>}
. Асинхронните хендлъри връщат :noreply
статуси,
докато синхронните reply
.
След като инициализацията се изпълни, процесът започва живота си, чакайки за съобщения.
Това е имплементирано с безкрайна рекурсия, чакаща на receive
.
Виждате, че имаме handle_cast
и handle_call
хендлъри на съобщения. Разликата е много проста:
handle_cast
се използват за асинхронна комуникация, а handle_call
за синхронна.
Всъщност ние имплементирахме проста версия на handle_call
поведението в SynchronousCallHandler
примера.
В Blogit.Components.Posts
имплементацията, използваме handle_cast({:update, <ново-състояние>}, <текущо-състояние>)
за да може друг процес да обнови постовете. Това се случва, когато има промяна в git
файловете.
Връщаме {:noreply, <ново-състояние>}
за да променим състоянието.
Можем да пратим съобщение, което ще се обработи от тази handle_cast
функция така:
GenServer.cast(Blogit.Components.Posts, {:update, new_posts})
Всъщност състоянието представлява Map
с ключове уникалните имена на публикации и стойности самите Blogit.Models.Post
структури.
Нека разгледаме:
def handle_call({:list, from, size}, _from, posts) do
result = Map.values(posts)
|> Post.sorted |> Enum.drop(from) |> Enum.take(size)
{:reply, result, posts}
end
Това е заявка към списък от постове, сортирани по датата си на създаване (най-новите първо) от дадена позиция и колко на брой.
Имплементацията взима стойностите на Map
-a състояние, сортира ги, премахва тези преди индекса from
и взима бройка равна или по-малка от size
.
Тъй като е синхронно извикване резултатът е {:reply, <резултат>, <състояние>}
. Много подобно поведение на нашия SynchronousCallHandler.handle_call
.
Виждате колко лесно може да се построи сървър на състояние със специфични съобщения и инициализация с помощта на GenServer
.
Целият код за рекурсията държаща състоянието и съпоставянето на съобщения към клаузи, който написахме в SynchronousCallHandler
, а и по-сложни случаи
е поет от GenServer
. Затова е толкова популярен начин за създаване на процеси. В production Erlang/Elixir
се ползва GenServer
вместо spawn/spawn_link/spawn_monitor
,
защото поема доста boilerplate
код.
Поведението GenServer
Ще разгледаме всяка от опционалните функции, които един GenServer
може да имплементира.
За дадена функция, ще разгледаме параметрите ѝ, поведението ѝ и какви резултати се очаква да връща.
init/1
@type args :: any
@type state :: any
@type reason :: any
@type timeout :: non_neg_integer
@type init_result ::
{:ok, state} |
{:ok, state, timeout | :hibernate} |
:ignore |
{:stop, reason}
@spec init(args) :: init_result
Можем да приемем init
за конструктор на GenServer
процеса. Тя приема аргументи
от какъвто и да е тип и връща състоянието на процеса. За множество аргументи можем да използваме списък.
Когато GenServer.start_link/3
се извика с даден модул, ако той дефинира init/1
, то тя ще се извика като конструктор.
Поведението по подразбиране, което се изпълнява, ако не дефинираме тази функция е да се използват аргументите като състояние:
def init(args), do: {:ok, args}
Функцията GenServer.start_link/3
взима модул като първи аргумент, аргументите, които подава на init/1
като втори и keyword
списък от опции като трети.
Такава опция видяхме по-горе - name
. Има и други, както е видно от документацията.
Както казахме, ако върнем {:ok, <състояние>}
, процесът стартира с това състояние.
Ако пък върнем {:ok, <състояние>, timeout-в-милисекунди}
, процесът ще стартира със състоянието, и ако за даденото време не получи никакво съобщение,
ще получи автоматично съобщение :timeout
, което може да прихване ако се дефинира handle_info(:timeout, <състояние>)
.
Ако init/1
върне {:ok, <състояние>, :hibernate}
, процесът ще хибернира.
Хиберниран процес остава хиберниран, докато не получи съобщение. Ако има съобщение в опашката му за съобщения - това е веднага.
Хибернирането пуска Garbage Collector
-а на heap
-а на този процес. Това е добър отговор ако скоро не се очаква съобщение, защото GC
операциите не са леки.
Ако инициализирането зарежда голям ресурс в паметта на процеса и връща малка част от него е добре да върнем {:ok, <състояние>, :hibernate}
Ако init/1
върне :ignore
, процесът ще излезе нормално и start_link
ще върне :ignore
.
Ако init/1
върне {:stop, reason}
, процесът ще излезе с върнатата причина и start_link/3
ще върне {:error, reason}
.
Когато init/1
върне някой от {:ok, state}
вариантите, GenServer.start_link/3
ще върне {:ok, pid}
.
Всъщност ние имплементирахме подобна функция за SynchronousCallHandler
.
handle_call
@type from :: {pid, ref}
@type handle_call_result ::
{:reply, reply, new_state} |
{:reply, reply, new_state, timeout | :hibernate} |
{:noreply, new_state} |
{:noreply, new_state, timeout | :hibernate} |
{:stop, reason, reply, new_state} |
{:stop, reason, new_state} when reply: term, new_state: term, reason: term
@spec handle_call(request :: term, from, state :: term) :: handle_call_result
Функциите handle_call
се извикват когато GenServer
-а получи съобщение,
изпратено от GenServer.call/3
.
Функцията GenServer.call/3
изпраща съобщение до GenServer
процес и чака
за отговор. Това е синхронна комуникация. Първият ѝ аргумент е pid
или име
на GenServer
процес, вторият - съобщението, което трябва да се изпрати, а третият е timeout
в милисекунди.
След като изтече този timeout
, call
спира да чака.
Какво получаваме е handle_call
?
- Съобщението, по което може да се
pattern match
-ва и така да имаме много версии наhandle_call
. - Наредена двойка от
pid
-а на извикващия процес и уникална референция за това извикване. - Състоянието на
GenServer
процеса.
Ако върнем с {:reply, <отговор>, <състояние>}
, ще върнем отговорът като резултат на GenServer.call\3
и
ще продължим със състоянието, което връщаме. Можем да очакваме подобно на init/1
поведение ако timeout
или :hibernate
са част от резултата. При timeout
процесът ще получи :timeout
съобщение след зададеното време, ако няма нови съобщения,
а при :hibernate
ще се включи GC
.
Ако отговорът е noreply
, процесът извикал GenServer.call/3
няма да получи отговор и ще чака.
Всеки процес може да отговори с GenServer.reply(from, reply)
. Нужно е само from
да е точно тази наредена двойка,
която е получена в handle_call
. Има три основни причини да върнем noreply
от handle_call
:
- Защото сме отговорили с
GenServer.reply/2
преди да върнем резултат. Това е когато знаем какво трябва да се отговори, но трябва да извършим някаква бавна операция преди да излезем отhandle_call
. - Защото ще отговори след като
handle_call
е свършила изпълнението си. Това е ако все още не знаем какво трябва да отговорим. Да речем има заявка към другGenServer
, за която чакаме отговор. - Защото някой друг процес трябва да отговори. Да речем някакъв background процес.
Връщаме :stop
резултат, когато искаме да прекратим изпълнението на GenServer
процеса.
В този случай ще се извика terminate(reason, state)
ако е дефинирана и процесът ще прекрати изпълнение с причина - зададената причина.
Поведението по подразбиране на handle_call
ако не е дефинирана за дадено съобщение е да върне {:stop, {:bad_call, request}, state}
.
handle_cast
@spec handle_cast(request :: term, state :: term) ::
{:noreply, new_state} |
{:noreply, new_state, timeout | :hibernate} |
{:stop, reason :: term, new_state} when new_state: term
Както споменахме handle_cast
функциите се изпълняват при асинхронна комуникация.
Това става чрез извикването на GenServer.cast(pid|name, request)
, която винаги връща :ok
и
не чака за отговор.
Функциите handle_cast
приемат изпратеното съобщение и текущото състояние.
Могат да върнат :noreply
резултати, които се държат по същия начин като тези на handle_call
, но нямат reply
част.
Могат и да върнат :stop
резултат, който ще извика terminate/2
ако е дефинирана и ще прекрати процеса с дадената причина.
Тези дефиниции най-често се използват за промяна на състоянието, както в нашия пример : handle_cast({:update, new_posts}, _)
.
handle_info
@spec handle_info(msg :: :timeout | term, state :: term) ::
{:noreply, new_state} |
{:noreply, new_state, timeout | :hibernate} |
{:stop, reason :: term, new_state} when new_state: term
Използват се за прихващане на всякакви други съобщения, да речем такива, пратени със send
.
Приемат и връщат аналогични параметри/резултати на тези на handle_cast
.
terminate
@type reason :: :normal | :shutdown | {:shutdown, term} | term
@spec terminate(reason, state :: term) :: term
Извиква се преди терминиране на GenServer
процес. Причината идва от резултат от типа {:stop, ...}
върнат от handle_*
функциите,
Или от exit
сигнал, ако GenServer
процеса е системен процес. Supervisor
процеси могат да пращат EXIT
съобщения, които да се предадат на тази функция.
code_change
Извиква се когато кодът на процеса се смени по време на изпълнение.
Това е възможно при Erlang/Elixir
. За повече информация прочетете в документацията
format_status
Използва се за специфично представяне на състоянието на GenServer
процес. За повече информация вижте документацията.
Заключение
GenServer
е лесен начин за писане на процеси, поемащ често-използвана логика.
Имплементациите му се вписват добре в supervision
дървото на OTP
програмите.
Ще си поговорим за Supervisor
процесите и OTP
програмите в следващите статии.