Coverage for qutebrowser/misc/httpclient.py : 30%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# vim: ft=python fileencoding=utf-8 sts=4 sw=4 et:
# Copyright 2014-2018 Florian Bruhin (The Compiler) <mail@qutebrowser.org> # # This file is part of qutebrowser. # # qutebrowser is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # qutebrowser is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with qutebrowser. If not, see <http://www.gnu.org/licenses/>.
QNetworkReply)
"""An HTTP client based on QNetworkAccessManager.
Intended for APIs, automatically decodes data.
Attributes: _nam: The QNetworkAccessManager used. _timers: A {QNetworkReply: QTimer} dict.
Signals: success: Emitted when the operation succeeded. arg: The received data. error: Emitted when the request failed. arg: The error message, as string. """
"""Create a new POST request.
Args: url: The URL to post to, as QUrl. data: A dict of data to send. """ if data is None: data = {} encoded_data = urllib.parse.urlencode(data).encode('utf-8') request = QNetworkRequest(url) request.setHeader(QNetworkRequest.ContentTypeHeader, 'application/x-www-form-urlencoded;charset=utf-8') reply = self._nam.post(request, encoded_data) self._handle_reply(reply)
"""Create a new GET request.
Emits success/error when done.
Args: url: The URL to access, as QUrl. """ request = QNetworkRequest(url) reply = self._nam.get(request) self._handle_reply(reply)
"""Handle a new QNetworkReply.""" if reply.isFinished(): self.on_reply_finished(reply) else: timer = QTimer(self) timer.setInterval(10000) timer.timeout.connect(reply.abort) timer.start() self._timers[reply] = timer reply.finished.connect(functools.partial( self.on_reply_finished, reply))
"""Read the data and finish when the reply finished.
Args: reply: The QNetworkReply which finished. """ timer = self._timers.pop(reply) if timer is not None: timer.stop() timer.deleteLater() if reply.error() != QNetworkReply.NoError: self.error.emit(reply.errorString()) return try: data = bytes(reply.readAll()).decode('utf-8') except UnicodeDecodeError: self.error.emit("Invalid UTF-8 data received in reply!") return self.success.emit(data) |