Pull to refresh

Реализация небольшого асинхронного сервера

Reading time9 min
Views21K
Целью публикации данного топика является представление аудитории Хабрахабра кода небольшого асинхронного сервера, написанного на Питоне с использованием практически «голых» сокетов.

Мне приходилось писать достаточно много приложений, работающих в качестве сетевых сервисов. Эти сервисы были написаны на разных языках, под разную нагрузку и каждый раз реализация нового сервиса чем-то отличалась от предыдущей. Под хабракатом я привожу пример довольно удачной, на мой вгляд, реализации «учебного» сервера, сопровождая код своими комментариями по мере необходимости.

Вводное слово


Итак, в начале несколько слов о том, что собственно делает сервер. Так как он является учебным примером, его задача достаточно проста: при запуске создать сокет, прослушивающий порт; при подключении клиента начать передавать ему информацию. В качестве информации клиенты получают нескончаемый поток цифр числа Пи, вычисляемых на сервере. Сервер генерирует цифры только тогда, когда к нему подключен хотя бы один клиент, запоминая уже сгенерированную последовательность. При отключении всех клиентов генерация приостанавливается, сервер ждет нового подключения.

Реализация сервера является однопоточной. Преимуществом такого подхода является отсутствие дополнительного оверхеда в виде отдельного потока или даже отдельного экземпляра приложения, который бы возник при использовании других часто применяемых техник.

По большому счету это отличие трудно назвать определяющим, т.к. Питон все-таки является интерпретируемым языком и вносит свои ограничения на область использования, однако в перспективе «сохраненные» при такой архитектуре потоки мы сможем использовать в приложении для выполнения каких-либо распараллеленных вычислительных задач.

Перед тем как продолжить, настоятельно рекомендую скачать код и бегло с ним ознакомиться. Также код можно посмотреть онлайн на Шоумикод.
Для запуска сервера пользователи UNIX-подобных систем могут выполнить следующие команды:
chmod +x ./pypi.py
./pypi.py

Для подключения к серверу можно использовать такую команду:
telnet 127.0.0.1 45067


Разбор кода


Ядром приложения является функция main(), запускающая цикл диспетчеризации модуля asyncore:
def main():
    try:
        asyncore.loop(0.1True, Server('127.0.0.1'))
    except KeyboardInterrupt:
        print '\nBye :-*'
        sys.exit()
 
if __name__ == '__main__':
    main()
 

Модуль asyncore предоставляет функцию loop, принимающую четыре опциональных аргумента: (1) таймаут, (2) флаг предпочтения механизма poll обычному select, (3) словарь дескрипторов сокетов, (4) количество итераций. Важное значение для нас имеет третий параметр, которым мы передали функции свежесозданный объект сервера.

Благодаря «особой питоновской магии» класс Server пронаследован как от класса dispatcher из модуля asyncore, так и от класса словаря dict, что позволяет ему выступать одновременно сокетом сервера и хранилищем дескрипторов сокетов всех подключенных клиентов.

Начало объявления класса Server выглядит таким образом:
class Server(dispatcher, dict):
    writable = lambda x: False
 
    def __init__(self, host = None, port = 0xB00B):
        dispatcher.__init__(self)
 
        self.create_socket(AF_INET, SOCK_STREAM)
        dict.__init__(self{self.fileno()self})
 
        self.set_reuse_addr()
        self.bind((host, port))
        self.listen(0xA)
 
        self.dataSource = PiGenerator()

В конструкторе объект сначала инициализируется как обработчик серверного сокета, а затем как словарь, состоящий из одной записи — дескриптора сокета самого сервера, указывающего на объект сервера. Важно, что сокет создается функцией create_socket до инициализации словаря, т.к. до создания сокета мы не смогли бы получить его дескриптор. Затем происходит привязка серверного сокета к порту на указанном хосте, запускается прослушка и создается генератор цифр числа Пи, который в дальнейшем будет использоваться для генерации потока данных клиенту.

После того, как цикл диспетчеризации запущен основная доля работы ложится на модуль asyncore, который при подключении нового клиента вызовет метод handle_accept объекта сервера для обработки входящего подключения:
    def handle_accept(self):
        sock, (host, port) = self.accept()
        print 'new client from %s:%d' % (host, port)
 
        stream = Stream(self.dataSource)        
        self[sock.fileno()] = Client(sock, self, stream)

Внутри метода-обработчика происходит непосредственно принятие нового клиента с помощью функции accept, которая возвращает вновь созданный сокет для общения с клиентом и пару хост-порт, с которых произошло подключение. Получив сокет клиента сервер создает новый поток данных (реализуемый классом Stream) для чтения данных из генератора. После этого к списку клиентов добавляется новый объект клиента, инициализируемый только что созданным потоком данных.

Чтение данных клиентом из потока осуществляется внутри метода writable():
    def writable(self):
        data = self.stream.read()
        if data == None:
            print 'client finished reading'
            self.close()
            return False
 
        self.buffer += data
        return len(self.buffer) > 

Метод writable вызывается модулем asyncore для каждого сокета перед очередной итерацией цикла диспетчеризации, чтобы узнать необходимо ли проверять для данного сокета доступость на запись. Мы пользуемся этим, чтобы попробовать прочитать данные из потока и сообщить о необходимости записи, если данные в потоке есть. Если поток возвращает None это значит, что в нем больше не будет данных и сокет закрывается. В данном примере такой ситуации проиcходить не должно, т.к. цифры генерируются бесконечно.

Узнав о доступности операции записи для клиентского сокета asyncore вызывает метод handle_write(), который отправляет данные, ранее прочитанные из потока, через сокет:
    def handle_write(self):                
        sent = self.send(self.buffer)
        self.buffer = self.buffer[sent:]

Генератор и поток тесно связаны между собой реализуя паттерн наблюдателя. Генератор выступает наблюдаемым объектом и предоставляет методы subscribe и unsubscribe соответственно для подписки на события и отписки от них:
class PiGenerator(list):    
    def subscribe(self, obj):  
        self.lock.acquire()
        try:     
            self.append(obj)
            self._notify(obj=obj)
        finally:
            self.lock.release()            
 
        if not self.calculator:
            self.calculator = PiCalcThread(selfself.lock)
            self.calculator.start()
        else:
            if len(self) > :
                self._resumeCalculator()
 
    def unsubscribe(self, obj):
        self.lock.acquire()
        self.remove(obj)   
        self.lock.release()
 
        if len(self) <:
            self._pauseCalculator()

Непосредственно генерация цифр реализована отдельным классом PiCalcThread, который производит вычисления в отдельном потоке, поэтому для синхронизации операций добавления и удаления подписки используется механизм блокировки. С помощью того же объекта блокировки осуществляется приостановка потока при отсутствии подписавшихся потоков. При подписке нового потока с помощью метода _notify() ему отправляются цифры, посчитанные и сохраненные до этого, если он является не первым подписавшимся потоком.

Метод _notify() пробегает по подписанным потокам и вызывает метод update передавая в поток новые цифры:
    def _notify(self, digits = None, obj = None):
        objs = [obj] if obj else self
        digits = digits or self.digits
 
        for obj in objs:
            obj.update(digits)

Метод update() потока просто добавляет новые данные к уже имеющимся:
    def update(self, data):
        self.data += data

Класс потока генерации цифр числа Пи использует алгоритм, предложенный Джереми Гиббонсом (Jeremy Gibbons) в документе Unbounded Spigot Algorithm for the Digits of Pi:
class PiCalcThread(Thread):
    def __init__(self, buffer, lock):
        Thread.__init__(self)
        self.buffer = buffer
        self.lock = lock
 
    def run(self):
        q,r,t,k,n,l = 1,,1,1,3,3
 
        while True:
            if 4*q+r-t < n*t:
                self.lock.acquire()
                self.buffer.newDigits(str(n))
                self.lock.release()
 
                q,r,t,k,n,l = (10*q,10*(r-n*t),t,k,(10*(3*q+r))/t-10*n,l)
            else:
                q,r,t,k,n,l = (q*k,(2*q+r)*l,t*l,k+1,(q*(7*k+2)+r*l)/(t*l),l+2)
 
            time.sleep(0.001)

Метод run() бесконечно вычисляет очередную цифру числа Пи и опять же с использованием блокировки сообщает ее генератору, который в свою очередь передает ее потокам данных, подписавшимся на генератор. Искусственная задержка добавлена для того, чтобы сервер не генерировал слишком большой поток данных в единицу времени.

Для подсветки синтаксиса при подготовке материала использовался ресурс http://highlight.hohli.com/. Очень надеюсь, что кому-то данная статья окажется полезной, хотя описание и получилось сумбурным при достаточно большом объеме.
Tags:
Hubs:
Total votes 46: ↑43 and ↓3+40
Comments26

Articles