Конкурентно програмиране II : Процеси
В тази публикация ще си говорим за основните градивни единици на всяка програма в Elixir - процесите. Досега ги споменавахме на доста места, защото нямаше как да избегнем това. Всъщност процесите в Elixir идват от Erlang. Когато включим и процесите в описанието на езика, за Elixir можем да кажем следното:
Elixir е език изграден от няколко ‘слоя’, като всеки от тях е надграждане на по-долните слоеве:
- Най-ниският слой е функционален Elixir - функционалният език с който се занимавахме досега. Доста стандартен функционален език с някои приятни идеи като съпоставянето на образци и pipe оператора.
- Следващият слой е конкурентен Elixir - Добавяйки процесите и съобщенията, които си обменят, получаваме точно това. Във всеки процес върви функционален код. Самите процеси не са функционални структури. Те се изпълняват конкурентно един на друг.
- С библиотеките на OTP и създаването на връзки и монитори между процеси имаме fault-tolerant Elixir.
- Ако добавим и отдалечена комуникация между процеси, получаваме дистрибутиран Elixir.
В тази публикация ще си говорим за конкурентния Elixir.
Erlang и неговата история
За да разберем защо моделът за конкурентност на Elixir е такъв, какъвто е, трябва да погледнем назад. Трябва да погледнем към Erlang и причините на създаването му.
Както може би знаете, Erlang е създаден в лаборатория на Ericsson през 80-те години. Основната му идея е да е способ за писане на конкурентни програми, които трябва да могат да се изпълняват безкрайно.
Всъщност изискването, поставено на Joe Armstrong е да измисли “по-добър начин за писане на телекомуникационни програми”. Още 80-те години, този тип програми са имали основно изискване да бъдат конкурентни (една програма трябва да може да поддържа хиляди едновременни транзакции). Освен това, задължително е трябвало да бъдат толерантни към грешки и проблеми, както софтуерни, така и хардуерни. Както и да имат практически нулев downtime. С други думи : да са винаги работещи, кодът им да може да се заменя с по-нови версии, докато те работят.
Тогавашните проблеми при телекомуникационните програми са всъщност днешните проблеми на сървърите със стотици хиляди потребители. Именно затова езици като Elixir и Erlang набират популярност.
В средата на 80-те Ericsson са използвали устройства наречени AXE, за осъществяване на връзка между потребители. Те са се програмирали на език, наречен PLEX, който е работил за горните изисквания, но е бил много труден за употреба и тясно свързан с хардуера на това устройство. Идеята на Erlang е била да бъде нещо като PLEX, но да може да върви на различни типове хардуер, както и да е по-бърз и лесен за писане.
В началото Erlang е бил повлиян от PLEX. При PLEX информацията се е копирала от компонент на компонент за да се избегнат грешки със споделен достъп до ресурси. Софтуерът е следвал следната спецификация:
- Множество паралелни процеси живеят в паметта.
- По всяко време, повечето от тях чакат събитие, което може да е провокирано или от съобщение пратено към тях, или от timer.
- Когато даденото събитие се случи, процесът прави някакво малко изчисление, променя някак състоянието си или изпраща съобщение до други процеси. След това пак започва да чака за ново събитие.
Тези процеси е трябвало да са много леки и лесни за създаване, което е отново наследено от AXE/PLEX системите. Това значи, че процесите трябва да са част от самия език, а не от операционната система, на която той се изпълнява.
Друго нещо, наследено от AXE/PLEX е, че при грешка само текущата транзакция ще бъде невалидна. Грешките в един процес не могат да влияят на другите процеси. Софтуерът трябва да продължи да работи.
По онова време паралелизъм означава множество устройства, които работят с даденият софтуер и се възприемат като едно. Тоест езикът трябва да е лесен за дистрибутивност.
За да са валидни тези свойства. Езикът е разработен така:
- Кодът върви в процеси, които са на ниво език. Подобни на green thread-ове в други езици.
- Тези процеси не споделят памет - имат собствен стек и собствен heap.
- Лесни са за създаване и си комуникират чрез размяна на съобщения.
- Могат да комуникират помежду си, дори да са на различни машини (това се случва малко по-късно в живота на езика).
- Ако един процес ‘умре’, другите продължават да живеят. Може нов да го замести, зависи от стратегията.
Този модел е различен от моделът, популярен навремето, а и сега, в който има споделена памет и нишки работят с различни ресурси, заключвайки и отключвайки ги за достъп.
Езикът Erlang започва като библиотека на Prolog за fault-tolerant програмиране, но се развива бързо като диалект на пролог и после като самостоятелен език. Първият интерпретатор на езика е на Prolog. Erlang е повлиян донякъде и от Smalltalk. Първата имплементация на Joe за изпращане на ‘звънящо’ съобщение до телефон е на Smalltalk. Влиянието на Smalltalk може да се види в размяната на съобщения между процесите. Между другото в Simula, един от първите обектно-ориентирани езици, извикването на метод на обект се нарича, изпращане на съобщения. По същия начин това повлиява за създаването на Actor модела за конкурентност.
Правилно е да се каже, че Erlang не е повлиян от и не имплементира Actor модела. Процесите на Erlang и актьорите имат общ предшественик - комуникацията между обекти със съобщения. Доста от идеите за Актьорите намират своят път в процесите на Erlang независимо от Actor модела. Често грешно наричат Erlang език, имплементиращ Actor модела. Все пак може да се каже, че процесите са актьори (държат се като актьори), чиито вътрешности са функционални. Във всеки процес върви функционален език. Така че вътрешността на един процес няма нищо общо с Actor модела.
И така Erlang се превръща от Prolog, който поддържа конкурентност в обособен език, по който започва да работи още един човек - Robert Virding. Двамата с Joe Armstrong оформят паралелно два интерпретатора на Erlang, написани на Prolog. Това все още остава прототип в лаборатория, но придобива потребители - 3-ма човека. Това че ги има тези хора, тестващи и използващи езика, подпомага много за развитието му. Да речем процесите започват да имат специален буфер, наречен ‘кутия за съобщения’, започват да могат да създават връзки помежду си. Така ако някой от тях получи грешка, друг може да бъде уведомен със специално съобщение и да реагира.
В края на 1989 година, езикът е тестван и функционалността му е намерена за задоволителна. Проблемът е че е много бавен. Излиза ново изискване - да го направят поне 40 пъти по-бърз, което после се увеличава. Така се ражда първата абстрактна машина на Erlang, написана на C - JAM ( Joe’s Abstract Machine ). Преди C и други езици са били разглеждани, други абстрактни машини са били разучавани. Третият човек работил по Erlang, Mike Williams е с доста повече опит от Joe в C, затова той написва JAM.
И така 90-те години бележат началото на изхвърляне на доста Prolog синтаксис от Erlang, по-добри GC стратегии, binary данни над даден размер да се пазят в общ heap за даден node и други.
През 1993 Bogumil (Bogdan) Hausman създава TEAM, прекръстена после на BEAM - доста по-оптимизирана машина за изпълнение на Erlang bytecode.
Две хубави неща се случват за Erlang:
- В края на 1995, проекта за AXE-N устройствата се сгромолясва. Те са щели да работят с друг, специфичен за тях език, а не с Erlang. Това води до широкото използване на устройства, програмирани на Erlang.
- През 1998 година Ericsson Radio AB забранява Erlang за ползване. Решението идва с идеята, фирмата и нейните продукти да не зависят от кръга хора около Erlang. Това обаче кара тези хора да напуснат Ericsson и довежда до отварянето на Erlang.
Първото събитие, сгромолясването на AXE-N, води до широкото използване на Erlang в Ericsson. Това пък е причина за разработката на framework-а OTP. От там нататък Erlang и OTP се разпространяват заедно. OTP съдържа:
- Множество малки и големи помощни библиотеки на Erlang.
- Design Patterns за програмиране на често желани програми.
- Документация, курсове и How to-та
- Mnesia/ETS бази данни
Отварянето на кода на Erlang, води до неговото популяризиране и по-масово използване.
С развитието на IOT и instant messaging програмите, както и програми, обслужващи хиляди request-и на секунда, моделът на Erlang става все по-актуален. От 2006 BEAM започва да поддържа паралелно изпълнение на процесори с множество ядра. А както знаете, преди няколко години Elixir се ражда върху BEAM.
Joe Armstrong нарича Erlang език за конкурентно-ориентирано програмиране, като се базира на няколко правила какво означава това.
Правилата са:
- Системата е изградена от процеси.
- Процесите не споделят нищо.
- Процесите си комуникират чрез асинхронно изпращане на съобщения.
- Процесите са изолирани един от друг.
В заключение:
- 1986 : Erlang е декларативен език с добавена способност за конкурентно изпълнение.
- 1995 : Erlang е функционален език с добавена способност за конкурентно изпълнение.
- 2005 : Erlang е конкурентно-ориентиран език, който се състои от комуникиращи си компоненти, написани на функционален език.
Всичко се върти около Erlang процесите. Езикът е измислен около тях и комуникацията между тях. Тъй като Elixir е роден върху BEAM, можем да кажем същото за него. Нека сега видим как да създаваме и работим с процеси.
Създаване на процеси
Един от начините да създадем нов процес в Elixir е чрез Kernel.spawn/1
:
spawn(fn ->
<изрази>
end)
Нека направим следната функция:
execute_after_action = fn (action, milliseconds) ->
Process.sleep(milliseconds)
result = action.()
IO.puts(result)
end
Идеята е изкуствено да имаме действие, което отнема дадено време да се изпълни.
execute_after_action.(fn -> "Awake!" end, 1000)
# След една секунда ще видим Awake! Дотогава програмата чака.
Сега можем да пуснем това действие да се изпълни в нов процес:
spawn(fn -> execute_after_action.(fn -> "Awake!" end, 1000) end)
IO.puts("Sleeping...")
Това което ще се случи е, че веднага след като пуснем процеса ще видим на екрана текста Sleeping…, а след една секунда ще видим Awake.
Друга форма на spawn
е Kernel.spawn/3
. Тази функция има три аргумента, често наричани MFA.
- M - означава модул, и е точно това - модул.
- F - означава функция и трябва да бъде атом, представляващ името на публична функция от модула
M
. - A - това са аргументите, които трябва да се предадат на тази функция. Те са във формата на списък.
defmodule Executor do
def action_after(action, milliseconds) do
Process.sleep(milliseconds)
IO.puts(action.())
end
end
spawn(Executor, :action_after, [fn -> "Finally!" end, 1000])
# Текущият процес няма да блокира.
# След една секунда ще видим 'Finally!' на екрана.
И така, spawn
е една от функциите за създаване на процеси. Създадените нови процеси
се изпълняват конкурентно на текущия процес. Казваме ‘текущия’, защото и кодът, който написахме
преди малко се изпълнява в процес. Пример е iex, който също се изпълнява в процеси.
Новите процеси се изпълняват конкурентно, а е възможно и да се изпълняват паралелно.
Конкурентност и паралелизъм
Няколко пъти казахме, че Erlang/Elixir е конкурентна платформа. Нека да уточним какво означава това.
Да си представим опашка в магазин - ако магазинерката е само една, всички чакат и си маркират продуктите един след друг. Тук няма никаква конкурентност, всяка транзакция на продукти се случва след приключване на предходната. Ако обаче отвори втора каса, опашките стават две, маркирането на продукти става по конкурентен начин. Две транзакции могат да вървят по едно и също време. В този пример имаме и паралелизъм. Но конкурентност и паралелизъм не са едно и също нещо.
Друг пример - нека имаме автор на статия и редактор. Двамата работят конкурентно върху статията; Авторът пише един абзац, редакторът минава над този абзац и го редактира, през това време авторът чака, след това пише втори абзац, редакторът чака, след това редакторът редактира. Така имаме конкурентност - и двамата работят по статията конкурентно. Можеше да нямаме - авторът първо да напише всичко, а редакторът да редактира след това. Сега паралелно би било следното. Авторът да напише първия абзац, да го прати на редактора и веднага да почне да работи по втория абзац, докато по същото време редакторът редактира първия.
Ако имаме много конкурентни парчета код които се изпълняват в няколко процеса сме конкурентни, ако обаче имаме няколко машини или процесорни ядра, на които се изпълняват сме и паралелни. Да речем AXE устройствата, за които говорихме, са поддържали паралелизъм, защото са били съставени от множество малки switch устройства, на всяко от които са вървели процеси.
Паралелизъм в Elixir
Когато стартираме Elixir, той върви в един OS process или една BEAM инстанция, която наричаме node. За всяко ядро на CPU-то си, обикновено получаваме по една OS-level нишка. Във всяка такава нишка се изпълнява нещото, наречено Scheduler. Тези Scheduler-и обикновено са обвързани с ядро на процесора, но е възможно и да ги сменят. Има си флагове, когато стартираме Elixir за определяне на поведението им.
Какво прави един такъв Scheduler? Управлява опашка, наречена run queue. Това е приоритетна опашка от Elixir процеси и портове. Това значи, че ако имаме четири Scheduler-а, е възможно да имаме четири паралелни Elixir-level процеса, защото всеки от тях е на различно ядро и управлява различна опашка от процеси.
Какво става като извикаме spawn
? Нов процес се създава и се поставя в някоя от опашките на Scheduler-ите.
Обикновено един такъв Elixir-level процес е голям около 1KB-2KB при създаването си (за разлика от OS-level нишките които обикновено взимат няколко мега байта само за стека си).
Това означава, че можем да създаваме огромен брой процеси без да се притесняваме. Говорим за милиони.
Интересно нещо е миграцията между Scheduler-и. Възможно е процес да смени опашката си и да започне да се управлява от друг Scheduler. Има сложен алгоритъм за балансиране на натоварването между ядрата, който е отговорен за тази миграция. Често ако Scheduler остане без работа може да си ‘поиска’ процеси от други опашки. Между другото, ако няма достатъчно работа, някои от Scheduler-ите няма въобще да бъдат стартирани, докато не се наложи. Стартирането и спирането на Scheduler-и е скъпа операция, така че се извършва сравнително рядко и с отлагане.
Добре е да се спомене, че Sheduler-ите освен Elixir процеси управляват и портове. Портовете са начинът по който Elixir комуникира с външния свят. Да речем с такива портове правим комуникация с други OS-level процеси, написани на друг език.
Процесите и портовете имат право на до N редукции. В текущата версия на BEAM това N е 2000. Всяка операция свързана с процес е редукция. Извикването на функция или макрос, изпращане на съобщение, GC в heap-a на дадения процес и така нататък. Когато текущо-изпълняващ се процес изчерпа редукциите си или пък е в очакване на нещо и не прави нищо, той става неактивен и Scheduler-а активизира друг от опашката. Това се отнася и за портовете - IO операции, комуникация с други OS-level процеси, изпращане на съобщения между дистрибутирани BEAM инстанции - всичко струва редукции. По тежките операции са по скъпи.
Тази стратегия на планиране се нарича превантивна ( preemptive ). В общи линии Scheduler-ът решава кога да прекъсне процес, използвайки броя на редукциите, времето или приоритета на процеса. За разлика от повечето езици, които използват кооперативна стратегия, в Elixir, Scheduler-ът може да прекрати изпълнението на задача ‘насила’. По този начин няма как много тежък откъм операции процес да е активен много дълго време, блокирайки всички други в опашката си.
Нека сега да видим как си взаимодействат процесите.
Комуникация между процеси
Има три основни функции за работа с процеси:
spawn
ги създава. Запознахме се с нея.send
изпраща съобщение до процес.receive
чака за съобщения към текущия процес.
Нека пак да разгледаме spawn
.
Вече знаем с какви аргументи работи и какво прави - създава нов процес.
Нека поговорим за това, което връща, именно инстанция на типа PID.
Всъщност PID-ът представлява адреса на процеса.
Може да бъде използван за изпращане на съобщения.
Процесите са напълно изолирани един от друг.
Начинът по който обменят информация е чрез тези съобщения, които копират данни (в повечето случаи) от heap-а на един процес към друг.
Функцията Kernel.send/2
приема PID на процеса, към който искаме да изпратим съобщение и самото съобщение:
pid = spawn(action)
send(pid, message)
#=> message
Третата важна функция при процесите, recieve
, прилича на case
.
Всеки процес има опашка от съобщения и когато дадено съобщение пристигне, то се слага в тази опашка.
При receive
процесът блокира ако няма съобщения в опашката и чака докато поне едно съобщение не влезе в нея.
Когато има такова съобщение, то се pattern match-ва с условията изброени в блока на receive
и ако има успех се изпълнява кодът съответстващ на това условие.
pid = spawn(fn ->
receive do
pattern1 -> action1
pattern2 -> action2
....
patternN -> actionN
end
end)
send(pid, pattern2)
Когато процесът изпълни логиката си, той ‘умира’ (приключва изпълнение).
Ето един истински пример:
pid = spawn(fn ->
receive do
:say_hi -> IO.puts("Hi!")
:say_by -> IO.puts("Bye!")
{:say, name, msg} -> IO.puts([name, " says ", msg])
end
end)
#=> #PID<0.104.0>
send(pid, {:say, "Arnold", "I'll be back!"})
# Arnold says I'll be back!
#=> {:say, "Arnold", "I'll be back!"}
Както казахме receive
е като case
, който се изпълнява върху полученото
съобщение. Съобщението може да е всякакъв тип. Можем да изпратим PID-а
на процеса, който извиква Kernel.send/2
и да го използваме за да получим отговор:
pid = spawn(fn ->
receive do
{sender, :ping} when is_pid(sender) -> send(sender, {self(), :pong})
end
end)
send(pid, {self(), :ping})
IO.puts("Let's wait for a pong!")
receive do
{sender, :pong} when is_pid(sender) ->
IO.puts([inspect(sender), " sends PONG!"])
end
# Ще се отпечата нещо като '#PID<0.150.0> sends PONG!'
Това е начинът по който два процеса могат да си ‘говорят’ - с препращане на PID-овете си.
Със Kernel.self/0
взимаме PID-а на текущия процес.
Така можем да имплементираме и синхронна комуникация - процес A изпраща съобщение на процес B и чака за отговор.
Пример : паралелен Enum.map/2 без да използваме Task
Нека имплементираме map
функцията, която изпълнява трансформация в отделен процес за всеки от елементите на колекция без да ползваме
абстракцията Task
:
defmodule PEnum do
def map(enumerable, map_func) do
enumerable |> Enum.map(spawn_func(map_func)) |> Enum.map(&receive_func/1)
end
defp spawn_func(map_func) do
current_pid = self()
fn x ->
spawn(fn -> send(current_pid, {self(), map_func.(x)}) end)
end
end
def receive_func(pid) do
receive do
{^pid, result} -> result
end
end
end
Дефинираме си нов модул - PEnum
, подобно на Enum
, той съдържа map/2
функция.
Всъщност PEnum.map/2
, използва Enum.map/2
, подавайки ѝ колекцията и функцията от по-висок ред spawn_func
.
Функцията spawn_func
- Приема функция и връща функция, която за дадена стойност
x
създава процес. - Този процес изпраща на адреса на процеса, извикващ
spawn_func
, PID-а си и резултата от функцията (map_func
), подадена наspawn_func
, изпълнена върху стойносттаx
като параметър.
Това звучи сложно, но всъщност просто извикваме Enum.map/2
с функция, която създава процес в който се изпълнява map_func
и връща PID-а му.
И така на този етап имаме списък от PID-ове.
Отново използваме Enum.map/2
, за да можем за всеки PID да изчакаме съобщението от процеса адресиран в него.
Тези PID-ове идват в реда на създаването на процесите им, които пък са създадени по реда на елементите в енумерацията.
- За всеки такъв PID, чакаме за съобщение от него, което е във формата
{PID, <стойност>}
. - Използваме pin оператора защото
pid
е променлива и не искаме да match-не всичко и да промени стойността си. - Искаме да чакаме завършването на процесите по реда по който са създадени.
- Точно това прави
receive_func
.
И така, процесът извикващ PEnum.map/2
, изчаква подред всеки процес, който създава.
В съобщенията се съдържа стойността от map_func
извикана с елементите от енумерацията.
Ето пример:
1..50 |> Enum.map(fn x -> :timer.sleep(1000); x* x end)
# След около 50 секунди ще имаме квадратите
1..50 |> PEnum.map(fn x -> :timer.sleep(1000); x* x end)
# След около секунда ще имаме квадратите
Заключение
Процесите в Elixir са много леки и лесни за употреба, не споделят данни, които могат да мутират и могат да се изпълняват паралелно. В следващата публикация ще разберем повече за тяхното устройство и начинът им на комуникация.