Comments 15
Добавим в функцию задержку на 1 секунду, а также выведем результат подсчета в консоль, перед тем как его вернуть.Этот момент вызвал сомнение. Значительное время тест не работает, а спит. Т.о. тестируем сон, а не только работу.
Тема действительно интересная, я сам недавно сталкивался с распараллеливанием вычислений в Elixir, поэтому сделал небольшой рефакторинг и добавил основную основную фичу, значительно облегчающую распараллеливание — Task.
Читать по ссылке, а пока предлагаю взглянуть на код:
defmodule Wordcount do
@main_word "лошадь"
defp count(text, word) do
length(String.split(text, word)) - 1
end
defp do_proceed(file_path) do
File.read!(file_path)
|> count(@main_word)
|> (fn (count) -> IO.puts "Found #{count} occurrence(s) of the word in file #{file_path}" end).()
end
def proceed(:async) do
Path.wildcard("./OGENRI/*.txt")
|> Enum.map(&Task.async(fn -> do_proceed(&1) end))
|> Enum.map(&Task.await/1)
end
def proceed(:sync) do
Path.wildcard("./OGENRI/*.txt")
|> Enum.map(&do_proceed/1)
end
### Example:
# benchmark
# benchmark(:sync)
def benchmark(type \\ :async) do
start_time = :os.system_time(:milli_seconds)
proceed(type)
end_time = :os.system_time(:milli_seconds)
IO.puts "Finished in #{(end_time - start_time)} miliseconds"
end
end
Вот конкретные различия между синхронной и асинхронной версией
def proceed(:sync) do
Path.wildcard("./OGENRI/*.txt")
|> Enum.map(&do_proceed/1)
end
def proceed(:async) do
Path.wildcard("./OGENRI/*.txt")
|> Enum.map(&Task.async(fn -> do_proceed(&1) end))
|> Enum.map(&Task.await/1)
end
Пишем синхронный код — потом вуаля! — и он щелчком пальцев превращается в асинхронный. Добро пожаловать в functional programming!)
Такая же искусственная конструкция.
Создание и управление процессами в эрланге — это достаточно нетривиальная штука, потому что надо следить как за ошибками в них, так и за используемыми ресурсами, да и поддерживать целостным дерево процессов что бы можно было остановить всю задачу за корневой.
Просто бездумно форкаться в ожидании успеха по процессу на каждый элемент списка — это годится только для статьи на хабр, но никак не для чего-то, что можно выложить на продакшн.
Path.wildcard("/data/OGENRI/*.txt")
|> Enum.map(fn(file) -> async_word_count.(file, "лошадь") end)
вы здесь создаете уйму неуправляемых процессов на каждый файл. Вы моментально схлопочете emfile, но узнать об этом не получится, потому что вы даже spawn_link не делаете.
К сожалению, вы демонстрируете очень вредный и неряшливый подход на неверных примерах.
Ваш результат лишь показывает, что спать асинхронно в 12 потоков быстрее, чем спать 12 раз синхронно в одном.
Игрушечный пример на одной машине не показывает оверхеда от передачи данных по сети (который может быть очень значительным), управления задачами (что если нода упадёт?) и т.п.
Кроме того, "классический" MapReduce устроен посложнее, чем у вас написано: редьюсеров обычно больше одного и результаты разделяются между редьюсерами по хэшу ключа и сортируются, это как раз одна из ключевых идей. Такой подход позволяет распараллелить фазу редьюса, избежать перегрузки машины, которая выполняет редьюс (все ключи могут просто не помещаться в память), и терять меньше работы в случае аварийного завершения редьюсера.
Перемножить большие матрицы на небольшом кластере или сделать JOIN независимых датасетов по какому-нибудь общему ключу было бы гораздо более показательно.
Подробнее: http://elixir-lang.org/blog/2016/07/14/announcing-genstage/
На последней конференции он два часа про GenStage говорил.
Почему «может быть»? Я ее уже в продакшене в хвост и в гриву гоняю. Валим специально оговорился, что Experimental
там только из-за возможных коллизий с именами/переименованиями, код — production-ready.
Хосе Валим хороший программист, но он из рельс, а там очень и очень принято делать proof of concept, а потом как срастется.
Как срастется в случае с эрлангом означает OOM и рассыпающиеся штуки под нагрузкой.
Какие у вас цифры на хвосте и гриве? Сколько трафика, сколько событий, как часто срабатывает система защиты от перегрузки внутри пайплайна, какая у вас запланированная реакция системы на перегруз внутренних частей пайплайна?
Запланированной реакции на перегруз в обычном понимании нет, потому что, если я правильно вообще все понимаю, consumer — в модели описанной выше — захлебнуться не может. Узким местом, таким образом, является provider. Чтобы этого избежать, у меня бэкэндом стоит Riak, который специально обещает записать все, что ему всунули, а вот уж читать — как получится. До пиковых значений мне добраться не удавалось пока, поэтому я даже и не знаю, что ответить на «как боремся».
В качестве premature optimization я запилил Riak hook на запись, который пишет «разреженный» бакет по данным раз в секунду (из-за того, что первоочередная задача — попасть в сторадж, бывает, что чуть реже, на десятые) и если provider начинает захлебываться, он откатывается на «разреженный» бакет. Но, повторюсь, мы до потолка пока не допрыгивали, и не заметно, что допрыгнем в ближайшее время.
_P. S._ на меня Валим производит впечатление человека, который свалил от DHH именно из-за того, что у него совсем другая парадигма мышления.
То, что вы делаете, можно сделать вообще как угодно. Хоть на каждый чих запускать процесс и слать ему сообщение.
Я говорю о ситуации, когда у нас по 16-24 ядер работают на 80% загрузки и если где-то возникает бесконтрольная передача данных из одной точки в другую, то это место гарантированно взорвется OOM-ом.
При этом перед этим оно успеет положить шедулер, потому что всё таки сборка мусора не бесплатная.
«for exchanging events with back-pressure» — вот это самое важное. Это очень правильные и ценные слова, но как всегда: хочется услышать, как это в бою.
Я знаю, что под gen_server-ом очень много выстраданной боли и даже в нём есть досадные проблемы, которые нельзя устранить (типа удалить запрос от сдохшего клиента). А что тут — интересно.
Я говорим о том, что Experimental.GenStage
в моем конкретном случае (там есть back-pressure и просто запускать процессы не получится, они time consuming) позволил бесплатно [в теории] снять головную боль про «что будет, если завтра я найду во входном канале не тысячу, а миллион реквестов в минуту». То есть ровно про то, для чего он был создан.
Вы же говорите о том, что у вас есть что-то такое, что нагружает N ядер на M процентов, что в теории может вообще нуждаться в ручном маршаллинге, своем шедулере, или своем умном GenServer
е. Я охотно верю, что все задачи в мире GenStage
не решит, но конкретно мою — контролировать количество и успех обработчиков — он решил. Как-то так.
Потому что все жизненно важное находится за консумером, который может от провайдера либо получить дополнительные N сущностей, либо упереться в мертвого/полудохлого провайдера, что не так страшно. Ад, который вы описываете с OOM, утянутым на дно шедулером и маршем Шопена, изолирован на уровне «за пределами бизнес логики».
Идея и реализация настолько просты, что я просто не понимаю, где оно может поломаться. Аналогия примерно такая: мы накрыли бомбу чугунной ванной. Мы не решили проблему взрыва, но мы гарантировали себе отсутствие разрушений вокруг.
Распределенные вычисления в Elixir — классический пример MapReduce