Twisted Conch. Передача потока. SCP implementation
Я тут http://ru-python.livejournal.com/305657.html спрашивал как через twisted.conch влить в стандартный ввод удалённого процесса большой поток/файл (по SSH).
Все примеры на приём. А на отдачу нет. Основная проблема - как бы хочется чтобы это вливание не блокировало остальные задачи и реагировало на "хватит лить, буфера заполнены".
Сделал рабочий пример передачи файлов по SCP (scp implementation on twisted.conch), вдруг кому пригодится
Кстати, статья http://blogs.oracle.com/janp/entry/how_the_scp_protocol_works#remote_to_remote кривая. Там афтар не удосужился проверить. Пришлось разбирать код rcp.
Все примеры на приём. А на отдачу нет. Основная проблема - как бы хочется чтобы это вливание не блокировало остальные задачи и реагировало на "хватит лить, буфера заполнены".
Сделал рабочий пример передачи файлов по SCP (scp implementation on twisted.conch), вдруг кому пригодится
Кстати, статья http://blogs.oracle.com/janp/entry/how_the_scp_protocol_works#remote_to_remote кривая. Там афтар не удосужился проверить. Пришлось разбирать код rcp.
#!/usr/bin/python # -*- coding: utf-8 -*- # Copyright (c) 2011 Phil Kulin http://diphost.ru """ Simple SCP transfer inplementation for examples This program is free software. It comes without any warranty, to the extent permitted by applicable law. You can redistribute it and/or modify it under the terms of the Do What The Fuck You Want To Public License, Version 2, as published by Sam Hocevar. See text below and http://sam.zoy.org/wtfpl/COPYING for more details DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE Version 2, December 2004 Copyright (C) 2004 Sam Hocevar <sam@hocevar.net> Everyone is permitted to copy and distribute verbatim or modified copies of this license document, and changing it is allowed as long as the name is changed. DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION 0. You just DO WHAT THE FUCK YOU WANT TO. """ from twisted.conch.client import connect, default, options from twisted.conch.ssh import connection, common, channel from twisted.internet import reactor import sys, os, os.path class ClientOptions(options.ConchOptions): synopsis = """Usage: scp.py [options] file [user@]host[:path] """ optParameters = [['buffersize', 'B', 2 ** 16, 'Size of the buffer to use for sending/receiving.']] def parseArgs(self, LocalFile, host): self['remotePath'] = '' if ':' in host: host, self['remotePath'] = host.split(':', 1) self['remotePath'].rstrip('/') if '@' in host: options['user'], options['host'] = host.split('@',1) if not options['user']: raise if not options['port']: options['port'] = 22 else: options['port'] = int(options['port']) self['localFile'] = LocalFile class SSHConnection(connection.SSHConnection): """ Класс соединения. Цепляет переписанный нами класс канала SSH """ def serviceStarted(self): self.openChannel(sourceSCPChannel(conn = self)) class sourceSCPChannel(channel.SSHChannel): """ Класс канала SSH. Единственная задача канала - передать файл. Код не содержит обработки ситуаций и ошибок. Пример несамодостаточен """ name = 'session' def openFailed(self, reason): print 'echo failed', reason def channelOpen(self, ignoredData): """ Открытие канала. Выставление начальных значений""" self.infile = None # будет объектом файлового дескриптора файла, который надо передать self.insize = 0 # размер файла. участвует в сообщениях протокола self.stream = 0 # флаг "идёт передача" self.stage = 0 # стадии протокола # дать запрос на запуск scp. недокументированный ключ -t включает режим "приёма" в заданную директорию d = self.conn.sendRequest(self, 'exec', common.NS('scp -t '+self.conn.options['remotePath']), wantReply = 1) d.addCallback(self._cbRequest) def _cbRequest(self, ignored): """ Функция вызывается при успешном запуске scp на той стороне""" print("remote SCP started") self.CHUNK_SIZE = self.conn.options['buffersize'] # собственно, по сколько считывать, это же значение предельно для буфера self.stage = 1 # стадия 1. ждём подтверждения от удалённого scp def closed(self): print("Connection closed") reactor.stop() def dataReceived(self, data): """ Обработка пришедших данных. Используется для приёма "подтверждений" от удалённого scp """ if self.stage == 1 and ord(data[0]) == 0: # стадия один подтверждена. начать стадию 2 self.stage = 2 self.insize = os.stat(self.conn.options['localFile']).st_size # послать служебное сообщение self.write("C0644 %d %s\n" % (self.insize,os.path.basename(self.conn.options['localFile']))) elif self.stage == 2 and ord(data[0]) == 0: # стадия 2 подтверждена. удалённая сторона подтвердила приём служебного сообщения self.stage = 3 # открыть файл и начать передачу (стадия 3) self.infile = open(self.conn.options['localFile'], 'rb') self.WriteChunk() elif self.stage == 3 and ord(data[0]) == 0: # стадия 3 - передача файла - подтверждена. закрыть соединение print("Transfer completed") self.loseConnection() else: print("unknown stage (%d) %s" %(self.stage, repr(data))) self.loseConnection() def closeReceived(self): """ Приняли запрос на закрытие соединения """ print('remote side closed %s' % self) def WriteChunk(self): """ Чтение файла блоками и передача их по каналу Для контроля цепочки вызовов служит флаг self.stream, который как бы говорит "есть что читать" """ chunk = '' if self.infile: chunk = self.infile.read(self.CHUNK_SIZE) if not chunk: # нечего прочесть из файла, прекратить передачу потока self.infile.close() self.stream = 0 # дописать в конец 0-байт, на который должна отреагировать удалённая сторона # собственно, по ответу на него мы поймём что всё передано self.write(chr(0)) else: self.stream = 1 self.write(chunk) def write(self,data): """ Переопределение метода. Единственной задачей является контроль внутреннего буфера программной реализации канала. Это даёт возможность не буферизировать весь файл в памяти и создать дискретное последовательное чтение файла Основная проблема в том, что в реализации библиотеки нет обратной связи, сообщающей "могу ещё писать". Сколько бы мы не дали оригинальному методу - он всё загонит в буфер и потом потихоньку его передаст. Поэтому приходится пристраиваться в "хвост". """ channel.SSHChannel.write(self,data) # если буфер меньше "окна чтения файла", то можно ещё вызвать функцию чтения файла if len(self.buf) < self.CHUNK_SIZE: # если конечно есть что читать if self.stream: self.WriteChunk() if __name__ == "__main__": args = sys.argv[1:] options = ClientOptions() options.parseOptions(args) conn = SSHConnection() conn.options = options connect.connect(options['host'], options['port'], options, default.verifyHostKey, default.SSHUserAuthClient(options['user'], options, conn)) reactor.run()
