Процеси и състояние


Всеки процес в еликсир изпълнява код, написан на функционален език. Този код е последователен. Самите процеси, обаче, приличат на актьорите в Actor модела. Те са компоненти, които си комуникират чрез съобщения.

Можем ли да разглеждаме един процес като контейнер на някакво състояние, което можем да четем или променяме?

Отговорът е, че можем. И тъй като вътрешността на процесите е функционална, това може да се постигне с рекурсия. В тази статия ще разгледаме процесите като контейнери на данни.

Как да направим от процес контейнер на данни?

Нека имаме следния модул - Wrapper:

defmodule Wrapper do
  def start_link(initial), do: spawn_link(fn -> run(initial) end)

  defp run(value) do
    receive do
      {:get, pid} when is_pid(pid) ->
        send(pid, {:ok, value})
        run(value)
      _ -> run(value)
    end
  end
end

Модулът Wrapper има само една публично-достъпна функция - start_link/1. Нейният параметър е стойността, която се съхранява в процес-опаковка.

Функцията run е скрита, за да не замърсява публичния интерфейс. Именно защото е скрита не можем да ползваме spawn_link/3. Използваме spawn_link вместо spawn с идеята, че кодът на процесът, който използва Wrapper процес е тясно свързан с него. Ако Wrapper процесът се счупи, очакваме и кодът, който си комуникира с него да се счупи.

Какво прави функцията run/1? Всъщност тя държи стойността, като си я предава с рекурсия. Тъй като винаги последното действие в run е извикване на run, получаваме tail call оптимизацията.

Процес създаден с Wrapper.start_link/1, слуша за съобщение {:get, <pid>} и си връща състоянието във формат {:ok, <value>}. Това може да се тества така:

pid = Wrapper.start_link(5)
send(pid, {:get, self()})

receive do
  msg -> IO.inspect(msg)
end
# {:ok, 5}

Също така този процес не задържа съобщения, които не разбира:

pid = Wrapper.start_link(5)
send(pid, :stuff)
:timer.sleep(100) # Чакаме малко, за да сме сигурни, че съобщението е обработено

{:messages, messages} = Process.info(pid, :messages)
Enum.count(messages) == 0
# true

И това е. Този процес ще си живее вечно, защото рекурсията създава безкраен цикъл. Стойността се предава от извикване на извикване на run, така че тя става опаковано състояние.

Синхронно взимане на състоянието

Лесно е да направим така, че стойността на Wrapper да може да се взима синхронно. Добавяме функция, която се справя със send и receive вместо нас:

def get(pid) do
  send(pid, {:get, self()})

  receive do
    {:ok, value} -> {:ok, value}
  end
end

Сега можем да проверим дали всичко работи добре:

pid = Wrapper.start_link(5)
Wrapper.get(pid)
# {:ok, 5}

Това което липсва е код, който се справя с грешки. С текущия код, ако процесът Wrapper ‘умре’, и текущият процес ще ‘умре’. Те са свързани. И все пак текущият процес може да е системен, или може да се получи дълго чакане на резултата. Ето една версия на get, която връща резултат при грешка:

def get(pid, timeout \\ 5000) do
  send(pid, {:get, self()})

  receive do
    {:ok, value} -> {:ok, value}
    anything -> {:error, anything}
  after
    timeout -> {:error, "Timeout"}
  end
end

Промяна на стойността

Искаме да можем да променяме стойността в Wrapper процеса. Това, разбира се, може да стане със съобщение.

receive do
  {:get, pid} when is_pid(pid) ->
    send(pid, {:ok, value})
    run(value)
  {:set, new_value, pid} when is_pid(pid) ->
    send(pid, {:ok, new_value})
    run(new_value)
  _ ->
    run(value)
end

По този начин можем да променим стойността. Ето и тест за това:

pid = Wrapper.start_link(5)
send(pid, {:set, 4, self()})
receive do
  msg -> IO.inspect(msg)
end
# {:ok, 4}

send(pid, {:get, self()})
receive do
  msg -> IO.inspect(msg)
end
# {:ok, 4}

Можем да направим синхронна версия:

def set(pid, new_value, timeout \\ 5000) do
  send(pid, {:set, new_value, self()})

  receive do
    {:ok, value} -> {:ok, value}
    anything -> {:error, anything}
  after
    timeout -> {:error, "Timeout"}
  end
end

Сега можем да използваме тази функция за променяне на стойността.

Частта с receive от Wrapper.get/2 функцията и новата функция са еднакви. В такъв случай е добре да сложим този код в една функция, която да се извиква и от set и от get. По този начин ще преизползваме код.

Промяна на стойността, използвайки текущата стойност

Ще е хубаво да можем да променяме стойността на Wrapper процесите, използвайки текущите им стойности. Проблемът е, че ако използваме get, а после set, някой друг процес може вече да е променил стойността и нашият да я презапише. Това е типичен race condition и е добре да има начин да го заобиколим. Именно затова ще добавим нов вид съобщение, което третира get и set като едно атомарно действие:

receive do
  {:get, pid} when is_pid(pid) ->
    send(pid, {:ok, value})
    run(value)
  {:set, new_value, pid} when is_pid(pid) ->
    send(pid, {:ok, new_value})
    run(new_value)
  {:get_and_update, action, pid} when is_pid(pid) and is_function(action) ->
    new_value = action.(value)
    send(pid, {:ok, new_value})
    run(new_value)
  _ ->
    run(value)
end

При get_and_update съобщение очакваме втората стойност на съобщението да е функция. Извикваме я с текущата стойност за да получим нова стойност. Пример:

pid = Wrapper.start_link(5)
send(pid, {:get_and_update, fn (v) -> v + 2 end, self()})

receive do
  msg -> IO.inspect(msg)
end
# {:ok, 7}

Лесно е да добавим и синхронен вариант:

def get_and_update(pid, action, timeout \\ 5000) when is_function(action) do
  send(pid, {:get_and_update, action, self()})

  receive_value(timeout)
end

Тук receive_value съдържа receive блокът, който е общ за всички синхронни функции.

Прекратяване на Wrapper процес

Това е лесно. Ще направим функция Wrapper.stop/1, която по pid, изпраща съобщение за спиране, при което рекурсията на Wrapper процеса спира:

receive do
  {:get, pid} when is_pid(pid) ->
    send(pid, {:ok, value})
    run(value)
  {:set, new_value, pid} when is_pid(pid) ->
    send(pid, {:ok, new_value})
    run(new_value)
  {:get_and_update, action, pid} when is_pid(pid) and is_function(action) ->
    new_value = action.(value)
    send(pid, {:ok, new_value})
    run(new_value)
  {:stop, pid} when is_pid(pid) ->
    send(pid, :ok)
  _ ->
    run(value)
end

def stop(pid) do
  send(pid, {:stop, self()})

  receive do :ok -> :ok; end
end

Сега можем да проверим това поведение:

pid = Wrapper.start_link(5)
Wrapper.stop(pid)
Process.alive?(pid)
# false

Модулът който направихме доста прилича на клас от обектно-ориентираното програмиране. Процесът му е инстанция и PID-ът я представлява. Държи състояние и функциите които дефинирахме на модула могат да се ползват като методи за достъп и промяна на това състояние.

Интересното тук е, че това състояние не може да бъде споделяно, винаги получаваме негово копие в текущия процес.

Това което имплементирахме си идва с Elixir и се нарича Agent. Всъщност нашият Wrapper е доста бледа и не-толкова-функционална версия на Agent.

Agent

Агентът е проста обвивка около състояние, съхранено в процес. Точно като нашия Wrapper. Разбира се, съхранението на състоянието е имплементирано чрез безкрайна рекурсия.

Да видим как се създава един такъв Agent процес:

{:ok, pid} = Agent.start_link(fn -> 5 end)

При агентите задаваме състоянието си като резултат от анонимна функция. Всъщност почти всяка функция от Agent взима анонимна функция като аргумент:

Agent.get(pid, fn v -> v end)
# 5

Идеята да подаваме функции на Агента е следната: Кодът в тази функция се изпълнява в процеса на Агента, което значи, че можем да си направим проста трансформация с оригиналната стойност и към текущият процес да се копира само резултата. Ние решаваме къде да се изпълни логиката.

Agent.get(pid, fn v -> v + 1 end)
# 6

Като цяло Агентът е абстракция и може да бъде ‘опакован’ в подобен на Wrapper модул, който скрива разни детайли като подаването на тези функции.

Агентите могат да се стартират като връзки, а могат да се стартират и като несвързани процеси, имат си функции за четене, промяна, промяна с ползване на текуща стойност и за асинхронна комуникация. Повече за тях можете да научите от документацията.

Заключение

Видяхме как можем да използваме процесите в Elixir като сървъри със състояние. Ще навлезем още по-навътре в тази тема, когато започнем да се занимаваме с OTP. Преди това, обаче, ще направим една крачка назад и ще разгледаме type-spec синтаксиса в Elixir, инструментът dialyzer и какво е поведение (behaviour).