Pull to refresh

MapReduce для начинающих на Erlang'e

Reading time 3 min
Views 4.5K
Я продолжаю свое погружение в Эрланг. Уже есть хитрый план переписать один из наших сервисов для мониторинга на Эрланге. Мы тут осваиваем облака Windows Azure и Amazon EC2 в качестве платформы для некоторых продуктов и внутренних задач типа QA, поэтому возможность использовать много ядер и машин без переписывания кода выглядить перспективно.

Итак, для начала простой, но реальный пример — есть проект ~2000 файлов. Надо составить список используемых переменных окружения. То есть найти вхождения строк «getenv(...)» и «GetVariable(...)» (это наш wrapper) и выдрать из них параметр.

Задача незамысловатая и давно решается программой на C++, которая даже обход каталогов не делает, а просто вызывает юниксовый «find», генерирующий список файлов по маске, и затем по списку лопатит файлы. На 2000 файлах работает пару секунд в один поток.

Теперь Эрланг. Тут хочется замутить что-нибудь более кучерявое, чем последовательный обход файлов. MapReduce как раз в тему — можно составить список файлов, затем анализ каждого файла делать параллельно (Map), аккумулируя найденных имена переменных, и в конце обработать все полученные входждение (Reduce), в нашем случае просто подсчитать количество вхождения каждой переменной.

Фактически мой код повторяет пример из "Programming Erlang" и использует модуль phofs (parallel higher-order functions) из этой же книги.

    -module(find_variables).
    -export([main/0, find_variables_in_file/2, process_found_variables/3]).

    -define(PATH, "/Projects/interesting_project").
    -define(MASK, "\\..*(cpp|c)").

    main() ->
        io:format("Creating list of files...~n", []),
        % Стандартная функция обхода файловой системы. Последний параметр -
        % функтор, накапливающий имена в списке.
        Files = filelib:fold_files(?PATH, ?MASK, true, 
                                   fun(N, A) -> [N | A] end, []),
        io:format("Found ~b file(s)~n", [length(Files)]),
        F1 = fun find_variables_in_file/2,   % Map
        F2 = fun process_found_variables/3,  % Reduce
        % Вызываем MapReduce через функцию benchmark, считающую время
        % выполнения.
        benchmark(fun() -> 
            L = phofs:mapreduce(F1, F2, [], Files),
            io:format("Found ~b variable(s)~n", [length(L)])
        end, "MapReduce").

    benchmark(Worker, Title) ->
        {T, _} = timer:tc(fun() -> Worker() end),
        io:format("~s: ~f sec(s)~n", [Title, T/1000000]).

    -define(REGEXP, "(getenv|GetVariable)\s*\\(\s*\"([^\"]+)\"\s*\\)").

    % Map. Анализ одного файла.
    find_variables_in_file(Pid, FileName) ->
        case file:open(FileName, [read]) of 
            {ok, File} ->
                % Заранее компилируем регулярное выражение.
                {_, RE} = re:compile(?REGEXP),
                % Данный обратный вызов пошлет родительскому контролирующему
                % потому сообщение с именем найденной переменной.
                CallBack = fun(Var) -> Pid ! {Var, 1} end,
                find_variable_in_file(File, RE, CallBack),
                file:close(File);
            {error, Reason} ->
                io:format("Unable to process '~s', ~p~n", [FileName, Reason]),
                exit(1)
        end.

    % Reduce. Анализ данных. Данная функция вызывается контролирующим 
    % процессом MapReduce для каждого найденного ключа вместе со списком
    % значений, ассоциированных с ним. В нашем случае это будут пары 
    % {VarName, 1}. Мы просто подсчитаем для каждого VarName количество 
    % пришедших пар, то есть количество найденных вхождений этой переменной.
    % Это и есть наш незамысловатый анализ.

    process_found_variables(Key, Vals, A) ->
        [{Key, length(Vals)} | A].

    % Построчный обход файла.
    find_variable_in_file(File, RE, CallBack) ->
        case io:get_line(File, "") of
           eof -> void;
           Line -> 
             scan_line_in_file(Line, RE, CallBack),
             find_variable_in_file(File, RE, CallBack)
        end.

    % Поиск строки в строке по регулярному выражению (скомпилированному ранее),
    % и в случае нахождение вызов CallBack с передачей ему имени найденной
    % переменной.
    scan_line_in_file(Line, RE, CallBack) ->
        case re:run(Line, RE) of
            {match, Captured} -> 
                [_, _, {NameP, NameL}] = Captured,
                Name = string:substr(Line, NameP + 1, NameL),
                CallBack(Name);
            nomatch -> void
        end.

Для сборки программы нужен модуль phofs. Он является универсальным, независимым от конкретных функций Map и Reduce.

И Makefile на всякий случай:

    target = find_variables

    all:
        erlc $(target).erl
        erlc phofs.erl
        erl -noshell -s $(target) main -s init stop

    clean:
        -rm *.beam *.dump


Пузомерка. Как я уже сказал, программа на C++ вместе со временем вызова «find» на моей машине работает 1-2 секунды. Версия на Erlang'e работает ~20 секунд. Плохо? Смотря как посмотреть. Если анализ каждого файла будет более длительным (то есть программа будет основное время тратить на анализ файла, а не обход каталогов), то тут уже не совсем очевидно, какое из решений будет более практично при увеличении числа файлов и сложности анализа.

Я новичок в Эрланге, поэтому будут признателен за критику кода.

Посты по теме:
Tags:
Hubs:
+26
Comments 14
Comments Comments 14

Articles