Връзки между процеси
В тази статия ще си говорим за създаване на връзки между процеси. Ще видим и как да наблюдаваме процеси, както и как да реагираме на грешки в процесите или на завършването на логиката им.
Когато процес ‘умре’
Какво става когато един процес завърши изпълнението си? Бива изчистен от паметта и просто спира да съществува. Кой научава за това. По подразбиране никой.
Има начини процесите да са свързани помежду си, така че ако един от тях завърши изпълнението си, другия да разбере за това. Нека имаме:
defmodule Quitter do
def run do
Process.sleep(3000)
exit(:i_am_tired)
end
end
Нека да го пуснем, и след 4-5 секунди да проверим дали е жив:
pid = spawn(Quitter, :run, [])
# След няколко секунди
Process.alive?(pid) #false
Това е нормално поведение, текущият процес не разбира кога Quitter
процесът е завършил.
Нека сега стартираме със spawn_link/3
:
spawn_link(Quitter, :run, [])
# След 3 секунди -> ** (EXIT from #PID<0.202.0>) :i_am_tired
Това което се случва е, че Quitter
процесът излиза, но с това убива и текущия процес.
Те са свързани. Това означава, че ако единият процес излезе с грешка, и другият ще излезе.
Разбира се ако единият процес завърши без грешка, няма да убие другия:
defmodule Quitter do
def run_no_error do
Process.sleep(3000)
end
end
pid = spawn_link(Quitter, :run_no_error, [])
# След няколко секунди
Process.alive?(pid) #false
Има начин за премахване на връзка - Process.unlink/1
:
pid = spawn_link(Quitter, :run, [])
Process.unlink(pid)
# Няна грешка
Подобно на това, можем да се свържем два съществуващи процеса с Process.link/1
.
defmodule Simple do
def run do
receive do
{:link, pid} when is_pid(pid) ->
Process.link(pid)
run()
:links ->
IO.puts(inspect(Process.info(self(), :links)))
run()
:die ->
IO.puts("#{inspect self()} is hit!")
exit(:ouch)
end
end
end
Нека направим пет процеса със Simple.run/0
:
pid1 = spawn(Simple, :run, [])
#PID<0.270.0>
pid2 = spawn(Simple, :run, [])
#PID<0.272.0>
pid3 = spawn(Simple, :run, [])
#PID<0.274.0>
pid4 = spawn(Simple, :run, [])
#PID<0.276.0>
pid5 = spawn(Simple, :run, [])
#PID<0.278.0>
Нека сега да ги свържем така:
pid1 -> pid2 -> pid3 -> pid4 -> pid5
send(pid1, {:link, pid2}) # {:link, #PID<0.272.0>}
send(pid2, {:link, pid3}) # {:link, #PID<0.274.0>}
send(pid3, {:link, pid4}) # {:link, #PID<0.276.0>}
send(pid4, {:link, pid5}) # {:link, #PID<0.278.0>}
Да проверим връзките им:
send(pid1, :links) # {:links, [#PID<0.272.0>]}
send(pid2, :links) # {:links, [#PID<0.270.0>, #PID<0.274.0>]}
send(pid3, :links) # {:links, [#PID<0.272.0>, #PID<0.276.0>]}
send(pid4, :links) # {:links, [#PID<0.274.0>, #PID<0.278.0>]}
send(pid5, :links) # {:links, [#PID<0.276.0>]}
Както споменахме по-горе, е видно че тези връзки са двупосочни. Това е по правилно:
pid1 <-> pid2 <-> pid3 <-> pid4 <-> pid5
Какво ще стане, когато убием pid5
?
send(pid5, :die) #PID<0.278.0> is hit!
Process.alive?(pid1) # false
Process.alive?(pid2) # false
Process.alive?(pid3) # false
Process.alive?(pid4) # false
Process.alive?(pid5) # false
Това означава, че грешката се разпространява транзитивно по link
-овете.
Въпросът е какво представлява тази грешка и можем ли да реагираме от някой от
свързаните процеси на нея така, че той да не бъде ликвидиран?
Съобщени при грешка
Всъщност тази грешка, която транзитивно убива свързаните процеси е специално съобщение. Процеси могат да си комуникират само със съобщения.
Тези специални съобщения се наричат сигнали. Exit
сигналите са ‘тайни’ съобщения,
които автоматично убиват процесите, които ги получат.
За да имаме fault-tolerant
система е добре да имаме способ да убиваме процесите и
и да разбираме кога процес е бил ликвидиран. Тези link
-ове вършат тази работа.
Второто много важно изискване за тази fault-tolerant
система е да има начин да рестартира процеси,
когато те ‘умрат’.
В Elixir
има специален вид процеси - системни процеси. Това са нормални процеси,
които могат да трансформират exit
сигналите, които получават в нормални съобщения.
По този начин можем да имаме процеси, които разбират кога други процеси се терминират.
Това са системни процеси, които държат link
-ове.
Системни процеси
Нормален процес може да стане системен с извикването на Process.flag(:trap_exit, true)
:
defmodule Quitter do
def run do
Process.sleep(3000)
exit(:i_am_tired)
end
end
Process.flag(:trap_exit, true)
spawn_link(Quitter, :run, [])
receive do msg -> IO.inspect(msg); end
# {:EXIT, #PID<0.95.0>, :i_am_tired}
И по точно този начин, ние можем да си построим fault tolerant
система.
Един системен процес може да ‘наблюдава’ други процеси и да слуша за exit
сигнали от тях.
Ако получи такъв сигнал, той може да реши да ги рестартира или да направи нещо по-умно.
Грешки и поведение
Ще разгледаме при какви случаи на грешка или терминиране на процес, какво би било поведението на свързан процес.
action = <?>
system_process = <?>
Process.flag(:trap_exit, system_process)
pid = spawn_link(action)
receive do
msg -> IO.inspect(msg)
end
Случай 1 : При action = fn -> :nothing end
- При
system_process = false
: Нищо няма да се случи. Текущият процес продължава да чака. - При
system_process = true
: Ще видим ‘{:EXIT, <pid>, :normal}
’ на екрана. Текущият процес продължава изпълнение.
При излизане без грешка, сигналът EXIT
е със статус :normal
. Този статус не убива не-системните свързани процеси
Случай 2 : При action = fn -> exit(:stuff) end
- При
system_process = false
: Текущият процес умира с грешка** (EXIT from <pid>) :stuff
. - При
system_process = true
: Ще видим ‘{:EXIT, <pid>, :stuff}
’ на екрана. Текущият процес продължава изпълнение.
Случай 3 : При action = fn -> exit(:normal) end
- При
system_process = false
: Нищо няма да се случи. Текущият процес продължава да чака. - При
system_process = true
: Ще видим ‘{:EXIT, <pid>, :normal}
’ на екрана. Текущият процес продължава изпълнение.
Поведението е аналогично на Случай 1. По този начин можем да излезем нормално от процес ръчно.
Случай 4 : При action = fn -> raise("Stuff") end
- При
system_process = false
: Текущият процес умира с грешка[error] Process <pid> raised an exception ** (RuntimeError) Stuff
. - При
system_process = true
: Ще видим ‘{:EXIT, <pid>, {%RuntimeError{message: "Stuff"}, [{:erl_eval, :do_apply, 6, [file: 'erl_eval.erl', line: 668]}]}}
’ на екрана. Текущият процес продължава изпълнение.
Случай 5 : При action = fn -> throw("Stuff") end
- При
system_process = false
: Текущият процес умира с грешка[error] Process <pid> raised an exception ** (ErlangError) erlang error: {:nocatch, "Stuff"}
. - При
system_process = true
: Ще видим ‘{:EXIT, <pid>, {{:nocatch, "Stuff"}, [{:erl_eval, :do_apply, 6, [file: 'erl_eval.erl', line: 668]}]}}
’ на екрана. Текущият процес продължава изпълнение.
Заключение
Тези пет поведения покриват всички възможни случаи. Както виждате, засега няма начин системния процес да бъде терминиран с какъвто и да е сигнал.
Истината, обаче е, че има начин. Има един много специален статус на exit
сигнал, който може да убие и системен процес.
Функцията Process.exit/2
Тази функция може да се използва, когато искаме от един процес да ликвидираме друг.
defmodule HelloPrinter do
def start_link do
Process.sleep(5000)
IO.puts("Hello")
start_link()
end
end
Process.flag(:trap_exit, true)
pid = spawn_link(HelloPrinter, :start_link, [])
# Започва да печата "Hello" на всеки 5 секунди
Process.exit(pid, :spri_se_be)
# Спира да печата Hello
receive do
msg -> IO.inspect(msg)
end
# {:EXIT, #PID<0.162.0>, :spri_se_be}
По този начин можем да убием всеки не-системен процес. Нека сега да имаме:
defmodule HiPrinter do
def start_link do
Process.flag(:trap_exit, true)
print()
end
defp print do
receive do
{:EXIT, pid, _} -> send(pid, "Няма да стане, батка!")
after
6000 -> IO.puts("Hi!")
end
print()
end
end
Това е системен процес. Той лови exit
сигнали и изпраща съобщение, че не му
действат. На всеки 6 секунди ни казва Hi!
. Искаме да го ликвидираме, но как?
Да видим:
pid = spawn_link(HiPrinter, :start_link, [])
# Почва да печата `Hi!`
exit(pid, :spri)
# Продължава да печата `Hi!`
receive do
msg -> IO.inspect(msg)
end
# 'Няма да стане, батка'
Process.exit(pid, :normal)
# Продължава да печата `Hi!`
receive do
msg -> IO.inspect(msg)
end
# 'Няма да стане, батка'
Process.exit(pid, :kill)
# Спира да печата!
receive do
msg -> IO.inspect(msg)
end
# {:EXIT, #PID<0.175.0>, :killed}
И така exit
с атома kill
, може да убие всякакъв процес, даже системен.
И съобщението има статус killed
.
Забелязвате че статусът от kill
стана killed
. Това е добре. Нашият процес
беше свързан с този, който ‘убихме’, а не искаме сигналът ни да рикушира обратно.
Сигналът kill
не е транзитивен към връзките.
Нека сега да изследваме ситуациите, които могат да се получат, когато използваме Process.exit/2
.
Ликвидиране и поведение
Имаме:
action = <?>
system_process = <?>
status = <?>
Process.flag(:trap_exit, system_process)
pid = spawn_link(action)
Process.exit(pid, status)
receive do
msg -> IO.inspect(msg)
end
Случай 1 : При action = fn -> Process.sleep(20_000) end
и status = :normal
- При
system_process = false
: Процесът с pidpid
умира след 20 секунди със статус:normal
. Текущият процес не е системен и не получава съобщение, затова продължава да чака неопределено време да получи някакво съобщение вreceive
. - При
system_process = true
: След 20 секунди получаваме съобщение{:EXIT, <pid>, :normal}
.
Случай 2 : При action = fn -> Process.sleep(20_000) end
и status = :stuff
- При
system_process = false
: Грешка :** (EXIT from <pid>) :stuff
. Текущият процес ‘умира’. - При
system_process = true
: Получаваме съобщение{:EXIT, <pid>, :stuff}
.
Случай 3 : При action = fn -> Process.sleep(20_000) end
и status = :kill
- При
system_process = false
: Грешка :** (EXIT from <pid>) killed
. Текущият процес ‘умира’. - При
system_process = true
: Получаваме съобщение{:EXIT, <pid>, :killed}
.
Случай 4 : При action = fn -> Process.exit(self(), :kill) end
и status = <каквото-и-да-е>
- При
system_process = false
: Грешка :** (EXIT from <pid>) killed
. Текущият процес ‘умира’. - При
system_process = true
: Получаваме съобщение{:EXIT, <pid>, :killed}
.
Случай 5 : При action = fn -> exit(:kill) end
и status = <каквото-и-да-е>
- При
system_process = false
: Грешка :** (EXIT from <pid>) killed
. Текущият процес ‘умира’. - При
system_process = true
: Получаваме съобщение{:EXIT, <pid>, :kill}
. Странно, нали?
exit(:reason)
е различен от Process.exit(<pid>, :reason)
.
exit(:reason)
е нещо като throw
, предизвиква ‘хвърляне’, което, ако не е хванато, ‘убива’ текущия процес.
Когато два процеса са свързани, от тази грешка се произвежда сигнал със съобщение :reason
, който ги ‘убива’.
Ако тази причина е :kill
, тя се променя на :killed
за да не убива системни процеси, които са свързани.
Системните процеси прихващат сигнала.
В случай 5, exit(:kill)
може да бъде хванат с catch
в процеса-източник, който го вика, но не е хванат, затова процесът ‘умира’ с причина :kill
.
Свързаният системен процес получава сигнал :killed
със съобщение оригиналната exit
причина - :kill
, а не kill
-сигнал.
Разбира се е възможно да изпращаме сигнали до себе си. Да речем Process.exit(self(), :kill)
, ще ликвидира
текущия процес, независимо дали е системен.
Наблюдаване на процеси
Свързването на процеси е двупосочно.
Ако единият от тях ‘умре’ другият ще получи EXIT
сигнал.
Този сигнал ще ‘убие’ всички свързани процеси, ако те не са системни.
Често искаме един процес да наблюдава друг без да му праща сигнал, ако случайно ‘умре’. Също така искаме да наблюдаваме процеси без текущият процес да е системен. Ако те ‘умрат’, просто искаме да бъдем нотифицирани с нормално съобщение, за да предприемем нещо.
Това е възможно с монитори:
pid = spawn(fn -> Process.sleep(3000) end)
Process.monitor(pid)
receive do
msg -> IO.inspect(msg)
end
# След 3 секунди ще получим нещо такова
# {:DOWN, #Reference<0.0.3.3915>, :process, #PID<0.348.0>, :normal}
Когато наблюдаваме процес чрез монитор, получаваме DOWN
съобщение ако процесът
прекрати изпълнението си или не съществува.
Добавяме монитор към процес с Process.monitor(pid)
. Тази фунцкия връща референция.
Референциите са специален тип в ELixir
, всяка от тях е уникална за текущия node
.
Когато получим DOWN
съобщението, тази референция ще е втория му елемент.
Четвъртият е PID
-а на процеса, който е завършил изпълнението си, а петият - статус.
Това са същите тези статуси, като при сигналите при свързани процеси.
Наблюдението е еднопосочно. Монитор може да се премахне чрез Process.demonitor(monitor_ref)
:
pid = spawn(fn -> Process.sleep(3000) end)
ref = Process.monitor(pid)
Process.demonitor(ref)
receive do
msg -> IO.inspect(msg)
after
4000 -> IO.puts("Няма съобщения...")
end
# Ще видим 'Няма съобщения...'
Има и версия на spawn
, която създава нов процес и автоматично му добавя монитор - spawn_monitor
:
{pid, ref} = spawn_monitor(fn -> Process.sleep(3000) end)
Process.exit(pid, :kill)
receive do
msg -> IO.inspect(msg)
end
# {:DOWN, <ref>, :process, <pid>, :killed
Най-добре ползвайте spawn_monitor
(както и spawn_link
) вместо Process.monitor
(Process.link
).
Има някаква възможност процеса, който искате да наблюдавате (или да свържете с текущия) да не съществува вече,
когато извикате Process.monitor
(Process.link
) и така текущият процес никога да не получи съобщение (сигнал).
Можете да приемете това като първият race condition
, който ви показваме в Elixir
.
Ползвайте spawn_*
функциите, те са атомарни.
Заключение
Разгледахме как да свържем два процеса или да наблюдаваме един от друг.
Когато даден процес завърши изпълнение, чрез тези връзки или монитори, друг може
да предприеме нещо. Това ни дава възможност да построим система, която при проблем
знае как да реагира и да остане функционираща.
OTP
ни дава механизми на по-високо ниво за постигаме на fault tolerance
, но те
също ползват връзки и монитори.
В следващата статия ще разгледаме как един процес може да пази състояние и как да достъпваме и променяме това състояние.