Процеси и състояние
Всеки процес в еликсир изпълнява код, написан на функционален език.
Този код е последователен.
Самите процеси, обаче, приличат на актьорите в Actor
модела.
Те са компоненти, които си комуникират чрез съобщения.
Можем ли да разглеждаме един процес като контейнер на някакво състояние, което можем да четем или променяме?
Отговорът е, че можем. И тъй като вътрешността на процесите е функционална, това може да се постигне с рекурсия. В тази статия ще разгледаме процесите като контейнери на данни.
Как да направим от процес контейнер на данни?
Нека имаме следния модул - Wrapper
:
defmodule Wrapper do
def start_link(initial), do: spawn_link(fn -> run(initial) end)
defp run(value) do
receive do
{:get, pid} when is_pid(pid) ->
send(pid, {:ok, value})
run(value)
_ -> run(value)
end
end
end
Модулът Wrapper
има само една публично-достъпна функция - start_link/1
.
Нейният параметър е стойността, която се съхранява в процес-опаковка.
Функцията run
е скрита, за да не замърсява публичния интерфейс.
Именно защото е скрита не можем да ползваме spawn_link/3
.
Използваме spawn_link
вместо spawn
с идеята, че кодът на процесът, който използва
Wrapper
процес е тясно свързан с него. Ако Wrapper
процесът се счупи, очакваме и кодът,
който си комуникира с него да се счупи.
Какво прави функцията run/1
?
Всъщност тя държи стойността, като си я предава с рекурсия.
Тъй като винаги последното действие в run
е извикване на run
, получаваме tail call
оптимизацията.
Процес създаден с Wrapper.start_link/1
, слуша за съобщение {:get, <pid>}
и
си връща състоянието във формат {:ok, <value>}
. Това може да се тества така:
pid = Wrapper.start_link(5)
send(pid, {:get, self()})
receive do
msg -> IO.inspect(msg)
end
# {:ok, 5}
Също така този процес не задържа съобщения, които не разбира:
pid = Wrapper.start_link(5)
send(pid, :stuff)
:timer.sleep(100) # Чакаме малко, за да сме сигурни, че съобщението е обработено
{:messages, messages} = Process.info(pid, :messages)
Enum.count(messages) == 0
# true
И това е. Този процес ще си живее вечно, защото рекурсията създава безкраен цикъл.
Стойността се предава от извикване на извикване на run
, така че тя става опаковано състояние.
Синхронно взимане на състоянието
Лесно е да направим така, че стойността на Wrapper
да може да се взима синхронно.
Добавяме функция, която се справя със send
и receive
вместо нас:
def get(pid) do
send(pid, {:get, self()})
receive do
{:ok, value} -> {:ok, value}
end
end
Сега можем да проверим дали всичко работи добре:
pid = Wrapper.start_link(5)
Wrapper.get(pid)
# {:ok, 5}
Това което липсва е код, който се справя с грешки.
С текущия код, ако процесът Wrapper
‘умре’, и текущият процес ще ‘умре’.
Те са свързани. И все пак текущият процес може да е системен, или може да се
получи дълго чакане на резултата. Ето една версия на get
, която връща резултат при грешка:
def get(pid, timeout \\ 5000) do
send(pid, {:get, self()})
receive do
{:ok, value} -> {:ok, value}
anything -> {:error, anything}
after
timeout -> {:error, "Timeout"}
end
end
Промяна на стойността
Искаме да можем да променяме стойността в Wrapper
процеса. Това, разбира се,
може да стане със съобщение.
receive do
{:get, pid} when is_pid(pid) ->
send(pid, {:ok, value})
run(value)
{:set, new_value, pid} when is_pid(pid) ->
send(pid, {:ok, new_value})
run(new_value)
_ ->
run(value)
end
По този начин можем да променим стойността. Ето и тест за това:
pid = Wrapper.start_link(5)
send(pid, {:set, 4, self()})
receive do
msg -> IO.inspect(msg)
end
# {:ok, 4}
send(pid, {:get, self()})
receive do
msg -> IO.inspect(msg)
end
# {:ok, 4}
Можем да направим синхронна версия:
def set(pid, new_value, timeout \\ 5000) do
send(pid, {:set, new_value, self()})
receive do
{:ok, value} -> {:ok, value}
anything -> {:error, anything}
after
timeout -> {:error, "Timeout"}
end
end
Сега можем да използваме тази функция за променяне на стойността.
Частта с receive
от Wrapper.get/2
функцията и новата функция са еднакви.
В такъв случай е добре да сложим този код в една функция, която да се извиква
и от set
и от get
. По този начин ще преизползваме код.
Промяна на стойността, използвайки текущата стойност
Ще е хубаво да можем да променяме стойността на Wrapper
процесите, използвайки
текущите им стойности. Проблемът е, че ако използваме get
, а после set
, някой
друг процес може вече да е променил стойността и нашият да я презапише.
Това е типичен race condition
и е добре да има начин да го заобиколим.
Именно затова ще добавим нов вид съобщение, което третира get
и set
като
едно атомарно действие:
receive do
{:get, pid} when is_pid(pid) ->
send(pid, {:ok, value})
run(value)
{:set, new_value, pid} when is_pid(pid) ->
send(pid, {:ok, new_value})
run(new_value)
{:get_and_update, action, pid} when is_pid(pid) and is_function(action) ->
new_value = action.(value)
send(pid, {:ok, new_value})
run(new_value)
_ ->
run(value)
end
При get_and_update
съобщение очакваме втората стойност на съобщението да е функция.
Извикваме я с текущата стойност за да получим нова стойност. Пример:
pid = Wrapper.start_link(5)
send(pid, {:get_and_update, fn (v) -> v + 2 end, self()})
receive do
msg -> IO.inspect(msg)
end
# {:ok, 7}
Лесно е да добавим и синхронен вариант:
def get_and_update(pid, action, timeout \\ 5000) when is_function(action) do
send(pid, {:get_and_update, action, self()})
receive_value(timeout)
end
Тук receive_value
съдържа receive
блокът, който е общ за всички синхронни
функции.
Прекратяване на Wrapper
процес
Това е лесно. Ще направим функция Wrapper.stop/1
, която по pid
, изпраща
съобщение за спиране, при което рекурсията на Wrapper
процеса спира:
receive do
{:get, pid} when is_pid(pid) ->
send(pid, {:ok, value})
run(value)
{:set, new_value, pid} when is_pid(pid) ->
send(pid, {:ok, new_value})
run(new_value)
{:get_and_update, action, pid} when is_pid(pid) and is_function(action) ->
new_value = action.(value)
send(pid, {:ok, new_value})
run(new_value)
{:stop, pid} when is_pid(pid) ->
send(pid, :ok)
_ ->
run(value)
end
def stop(pid) do
send(pid, {:stop, self()})
receive do :ok -> :ok; end
end
Сега можем да проверим това поведение:
pid = Wrapper.start_link(5)
Wrapper.stop(pid)
Process.alive?(pid)
# false
Модулът който направихме доста прилича на клас от обектно-ориентираното програмиране.
Процесът му е инстанция и PID
-ът я представлява. Държи състояние и функциите които дефинирахме
на модула могат да се ползват като методи за достъп и промяна на това състояние.
Интересното тук е, че това състояние не може да бъде споделяно, винаги получаваме негово копие в текущия процес.
Това което имплементирахме си идва с Elixir
и се нарича Agent
.
Всъщност нашият Wrapper
е доста бледа и не-толкова-функционална версия на Agent
.
Agent
Агентът е проста обвивка около състояние, съхранено в процес. Точно като нашия
Wrapper
. Разбира се, съхранението на състоянието е имплементирано чрез безкрайна рекурсия.
Да видим как се създава един такъв Agent
процес:
{:ok, pid} = Agent.start_link(fn -> 5 end)
При агентите задаваме състоянието си като резултат от анонимна функция.
Всъщност почти всяка функция от Agent
взима анонимна функция като аргумент:
Agent.get(pid, fn v -> v end)
# 5
Идеята да подаваме функции на Агента е следната: Кодът в тази функция се изпълнява в процеса на Агента, което значи, че можем да си направим проста трансформация с оригиналната стойност и към текущият процес да се копира само резултата. Ние решаваме къде да се изпълни логиката.
Agent.get(pid, fn v -> v + 1 end)
# 6
Като цяло Агентът е абстракция и може да бъде ‘опакован’ в подобен на Wrapper
модул,
който скрива разни детайли като подаването на тези функции.
Агентите могат да се стартират като връзки, а могат да се стартират и като несвързани процеси, имат си функции за четене, промяна, промяна с ползване на текуща стойност и за асинхронна комуникация. Повече за тях можете да научите от документацията.
Заключение
Видяхме как можем да използваме процесите в Elixir
като сървъри със състояние.
Ще навлезем още по-навътре в тази тема, когато започнем да се занимаваме с OTP
.
Преди това, обаче, ще направим една крачка назад и ще разгледаме type-spec
синтаксиса в Elixir
,
инструментът dialyzer
и какво е поведение (behaviour
).