Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

# vim: ft=python fileencoding=utf-8 sts=4 sw=4 et: 

 

# Copyright 2014-2018 Florian Bruhin (The Compiler) <mail@qutebrowser.org> 

# 

# This file is part of qutebrowser. 

# 

# qutebrowser is free software: you can redistribute it and/or modify 

# it under the terms of the GNU General Public License as published by 

# the Free Software Foundation, either version 3 of the License, or 

# (at your option) any later version. 

# 

# qutebrowser is distributed in the hope that it will be useful, 

# but WITHOUT ANY WARRANTY; without even the implied warranty of 

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

# GNU General Public License for more details. 

# 

# You should have received a copy of the GNU General Public License 

# along with qutebrowser. If not, see <http://www.gnu.org/licenses/>. 

 

"""Our own QNetworkAccessManager.""" 

 

import collections 

import html 

 

import attr 

from PyQt5.QtCore import (pyqtSlot, pyqtSignal, QCoreApplication, QUrl, 

QByteArray) 

from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkReply, QSslSocket 

 

from qutebrowser.config import config 

from qutebrowser.utils import message, log, usertypes, utils, objreg, urlutils 

from qutebrowser.browser import shared 

from qutebrowser.browser.webkit import certificateerror 

from qutebrowser.browser.webkit.network import (webkitqutescheme, networkreply, 

filescheme) 

 

 

HOSTBLOCK_ERROR_STRING = '%HOSTBLOCK%' 

_proxy_auth_cache = {} 

 

 

@attr.s(frozen=True) 

class ProxyId: 

 

"""Information identifying a proxy server.""" 

 

type = attr.ib() 

hostname = attr.ib() 

port = attr.ib() 

 

 

def _is_secure_cipher(cipher): 

"""Check if a given SSL cipher (hopefully) isn't broken yet.""" 

tokens = [e.upper() for e in cipher.name().split('-')] 

if cipher.usedBits() < 128: 

# https://codereview.qt-project.org/#/c/75943/ 

return False 

# OpenSSL should already protect against this in a better way 

elif cipher.keyExchangeMethod() == 'DH' and utils.is_windows: 

# https://weakdh.org/ 

return False 

elif cipher.encryptionMethod().upper().startswith('RC4'): 

# http://en.wikipedia.org/wiki/RC4#Security 

# https://codereview.qt-project.org/#/c/148906/ 

return False 

elif cipher.encryptionMethod().upper().startswith('DES'): 

# http://en.wikipedia.org/wiki/Data_Encryption_Standard#Security_and_cryptanalysis 

return False 

elif 'MD5' in tokens: 

# http://www.win.tue.nl/hashclash/rogue-ca/ 

return False 

# OpenSSL should already protect against this in a better way 

# elif (('CBC3' in tokens or 'CBC' in tokens) and (cipher.protocol() not in 

# [QSsl.TlsV1_0, QSsl.TlsV1_1, QSsl.TlsV1_2])): 

# # http://en.wikipedia.org/wiki/POODLE 

# return False 

### These things should never happen as those are already filtered out by 

### either the SSL libraries or Qt - but let's be sure. 

elif cipher.authenticationMethod() in ['aNULL', 'NULL']: 

# Ciphers without authentication. 

return False 

elif cipher.encryptionMethod() in ['eNULL', 'NULL']: 

# Ciphers without encryption. 

return False 

elif 'EXP' in tokens or 'EXPORT' in tokens: 

# Weak export-grade ciphers 

return False 

elif 'ADH' in tokens: 

# No MITM protection 

return False 

### This *should* happen ;) 

else: 

return True 

 

 

def init(): 

"""Disable insecure SSL ciphers on old Qt versions.""" 

default_ciphers = QSslSocket.defaultCiphers() 

log.init.debug("Default Qt ciphers: {}".format( 

', '.join(c.name() for c in default_ciphers))) 

 

good_ciphers = [] 

bad_ciphers = [] 

for cipher in default_ciphers: 

if _is_secure_cipher(cipher): 

good_ciphers.append(cipher) 

else: 

bad_ciphers.append(cipher) 

 

log.init.debug("Disabling bad ciphers: {}".format( 

', '.join(c.name() for c in bad_ciphers))) 

QSslSocket.setDefaultCiphers(good_ciphers) 

 

 

class NetworkManager(QNetworkAccessManager): 

 

"""Our own QNetworkAccessManager. 

 

Attributes: 

adopted_downloads: If downloads are running with this QNAM but the 

associated tab gets closed already, the NAM gets 

reparented to the DownloadManager. This counts the 

still running downloads, so the QNAM can clean 

itself up when this reaches zero again. 

_scheme_handlers: A dictionary (scheme -> handler) of supported custom 

schemes. 

_win_id: The window ID this NetworkManager is associated with. 

(or None for generic network managers) 

_tab_id: The tab ID this NetworkManager is associated with. 

(or None for generic network managers) 

_rejected_ssl_errors: A {QUrl: [SslError]} dict of rejected errors. 

_accepted_ssl_errors: A {QUrl: [SslError]} dict of accepted errors. 

_private: Whether we're in private browsing mode. 

netrc_used: Whether netrc authentication was performed. 

 

Signals: 

shutting_down: Emitted when the QNAM is shutting down. 

""" 

 

shutting_down = pyqtSignal() 

 

def __init__(self, *, win_id, tab_id, private, parent=None): 

log.init.debug("Initializing NetworkManager") 

with log.disable_qt_msghandler(): 

# WORKAROUND for a hang when a message is printed - See: 

# http://www.riverbankcomputing.com/pipermail/pyqt/2014-November/035045.html 

super().__init__(parent) 

log.init.debug("NetworkManager init done") 

self.adopted_downloads = 0 

self._win_id = win_id 

self._tab_id = tab_id 

self._private = private 

self._scheme_handlers = { 

'qute': webkitqutescheme.handler, 

'file': filescheme.handler, 

} 

self._set_cookiejar() 

self._set_cache() 

self.sslErrors.connect(self.on_ssl_errors) 

self._rejected_ssl_errors = collections.defaultdict(list) 

self._accepted_ssl_errors = collections.defaultdict(list) 

self.authenticationRequired.connect(self.on_authentication_required) 

self.proxyAuthenticationRequired.connect( 

self.on_proxy_authentication_required) 

self.netrc_used = False 

 

def _set_cookiejar(self): 

"""Set the cookie jar of the NetworkManager correctly.""" 

if self._private: 

cookie_jar = objreg.get('ram-cookie-jar') 

else: 

cookie_jar = objreg.get('cookie-jar') 

 

# We have a shared cookie jar - we restore its parent so we don't 

# take ownership of it. 

self.setCookieJar(cookie_jar) 

app = QCoreApplication.instance() 

cookie_jar.setParent(app) 

 

def _set_cache(self): 

"""Set the cache of the NetworkManager correctly.""" 

if self._private: 

return 

# We have a shared cache - we restore its parent so we don't take 

# ownership of it. 

app = QCoreApplication.instance() 

cache = objreg.get('cache') 

self.setCache(cache) 

cache.setParent(app) 

 

def _get_abort_signals(self, owner=None): 

"""Get a list of signals which should abort a question.""" 

abort_on = [self.shutting_down] 

if owner is not None: 

abort_on.append(owner.destroyed) 

# This might be a generic network manager, e.g. one belonging to a 

# DownloadManager. In this case, just skip the webview thing. 

if self._tab_id is not None: 

assert self._win_id is not None 

tab = objreg.get('tab', scope='tab', window=self._win_id, 

tab=self._tab_id) 

abort_on.append(tab.load_started) 

return abort_on 

 

def shutdown(self): 

"""Abort all running requests.""" 

self.setNetworkAccessible(QNetworkAccessManager.NotAccessible) 

self.shutting_down.emit() 

 

# No @pyqtSlot here, see 

# https://github.com/qutebrowser/qutebrowser/issues/2213 

def on_ssl_errors(self, reply, errors): # noqa: C901 pragma: no mccabe 

"""Decide if SSL errors should be ignored or not. 

 

This slot is called on SSL/TLS errors by the self.sslErrors signal. 

 

Args: 

reply: The QNetworkReply that is encountering the errors. 

errors: A list of errors. 

""" 

errors = [certificateerror.CertificateErrorWrapper(e) for e in errors] 

log.webview.debug("Certificate errors: {!r}".format( 

' / '.join(str(err) for err in errors))) 

try: 

host_tpl = urlutils.host_tuple(reply.url()) 

except ValueError: 

host_tpl = None 

is_accepted = False 

is_rejected = False 

else: 

is_accepted = set(errors).issubset( 

self._accepted_ssl_errors[host_tpl]) 

is_rejected = set(errors).issubset( 

self._rejected_ssl_errors[host_tpl]) 

 

log.webview.debug("Already accepted: {} / " 

"rejected {}".format(is_accepted, is_rejected)) 

 

if is_rejected: 

return 

elif is_accepted: 

reply.ignoreSslErrors() 

return 

 

abort_on = self._get_abort_signals(reply) 

ignore = shared.ignore_certificate_errors(reply.url(), errors, 

abort_on=abort_on) 

if ignore: 

reply.ignoreSslErrors() 

err_dict = self._accepted_ssl_errors 

else: 

err_dict = self._rejected_ssl_errors 

if host_tpl is not None: 

err_dict[host_tpl] += errors 

 

def clear_all_ssl_errors(self): 

"""Clear all remembered SSL errors.""" 

self._accepted_ssl_errors.clear() 

self._rejected_ssl_errors.clear() 

 

@pyqtSlot(QUrl) 

def clear_rejected_ssl_errors(self, url): 

"""Clear the rejected SSL errors on a reload. 

 

Args: 

url: The URL to remove. 

""" 

try: 

del self._rejected_ssl_errors[url] 

except KeyError: 

pass 

 

@pyqtSlot('QNetworkReply*', 'QAuthenticator*') 

def on_authentication_required(self, reply, authenticator): 

"""Called when a website needs authentication.""" 

netrc_success = False 

if not self.netrc_used: 

self.netrc_used = True 

netrc_success = shared.netrc_authentication(reply.url(), 

authenticator) 

if not netrc_success: 

abort_on = self._get_abort_signals(reply) 

shared.authentication_required(reply.url(), authenticator, 

abort_on=abort_on) 

 

@pyqtSlot('QNetworkProxy', 'QAuthenticator*') 

def on_proxy_authentication_required(self, proxy, authenticator): 

"""Called when a proxy needs authentication.""" 

proxy_id = ProxyId(proxy.type(), proxy.hostName(), proxy.port()) 

if proxy_id in _proxy_auth_cache: 

user, password = _proxy_auth_cache[proxy_id] 

authenticator.setUser(user) 

authenticator.setPassword(password) 

else: 

msg = '<b>{}</b> says:<br/>{}'.format( 

html.escape(proxy.hostName()), 

html.escape(authenticator.realm())) 

abort_on = self._get_abort_signals() 

answer = message.ask( 

title="Proxy authentication required", text=msg, 

mode=usertypes.PromptMode.user_pwd, abort_on=abort_on) 

if answer is not None: 

authenticator.setUser(answer.user) 

authenticator.setPassword(answer.password) 

_proxy_auth_cache[proxy_id] = answer 

 

@pyqtSlot() 

def on_adopted_download_destroyed(self): 

"""Check if we can clean up if an adopted download was destroyed. 

 

See the description for adopted_downloads for details. 

""" 

self.adopted_downloads -= 1 

log.downloads.debug("Adopted download destroyed, {} left.".format( 

self.adopted_downloads)) 

assert self.adopted_downloads >= 0 

if self.adopted_downloads == 0: 

self.deleteLater() 

 

@pyqtSlot(object) # DownloadItem 

def adopt_download(self, download): 

"""Adopt a new DownloadItem.""" 

self.adopted_downloads += 1 

log.downloads.debug("Adopted download, {} adopted.".format( 

self.adopted_downloads)) 

download.destroyed.connect(self.on_adopted_download_destroyed) 

download.adopt_download.connect(self.adopt_download) 

 

def set_referer(self, req, current_url): 

"""Set the referer header.""" 

referer_header_conf = config.val.content.headers.referer 

 

try: 

if referer_header_conf == 'never': 

# Note: using ''.encode('ascii') sends a header with no value, 

# instead of no header at all 

req.setRawHeader('Referer'.encode('ascii'), QByteArray()) 

elif (referer_header_conf == 'same-domain' and 

not urlutils.same_domain(req.url(), current_url)): 

req.setRawHeader('Referer'.encode('ascii'), QByteArray()) 

# If refer_header_conf is set to 'always', we leave the header 

# alone as QtWebKit did set it. 

except urlutils.InvalidUrlError: 

# req.url() or current_url can be invalid - this happens on 

# https://www.playstation.com/ for example. 

pass 

 

# WORKAROUND for: 

# http://www.riverbankcomputing.com/pipermail/pyqt/2014-September/034806.html 

# 

# By returning False, we provoke a TypeError because of a wrong return 

# type, which does *not* trigger a segfault but invoke our return handler 

# immediately. 

@utils.prevent_exceptions(False) 

def createRequest(self, op, req, outgoing_data): 

"""Return a new QNetworkReply object. 

 

Args: 

op: Operation op 

req: const QNetworkRequest & req 

outgoing_data: QIODevice * outgoingData 

 

Return: 

A QNetworkReply. 

""" 

proxy_factory = objreg.get('proxy-factory', None) 

if proxy_factory is not None: 

proxy_error = proxy_factory.get_error() 

if proxy_error is not None: 

return networkreply.ErrorNetworkReply( 

req, proxy_error, QNetworkReply.UnknownProxyError, 

self) 

 

scheme = req.url().scheme() 

if scheme in self._scheme_handlers: 

result = self._scheme_handlers[scheme](req) 

if result is not None: 

result.setParent(self) 

return result 

 

for header, value in shared.custom_headers(): 

req.setRawHeader(header, value) 

 

host_blocker = objreg.get('host-blocker') 

if host_blocker.is_blocked(req.url()): 

log.webview.info("Request to {} blocked by host blocker.".format( 

req.url().host())) 

return networkreply.ErrorNetworkReply( 

req, HOSTBLOCK_ERROR_STRING, QNetworkReply.ContentAccessDenied, 

self) 

 

# There are some scenarios where we can't figure out current_url: 

# - There's a generic NetworkManager, e.g. for downloads 

# - The download was in a tab which is now closed. 

current_url = QUrl() 

 

if self._tab_id is not None: 

assert self._win_id is not None 

try: 

tab = objreg.get('tab', scope='tab', window=self._win_id, 

tab=self._tab_id) 

current_url = tab.url() 

except (KeyError, RuntimeError): 

# https://github.com/qutebrowser/qutebrowser/issues/889 

# Catching RuntimeError because we could be in the middle of 

# the webpage shutdown here. 

current_url = QUrl() 

 

self.set_referer(req, current_url) 

return super().createRequest(op, req, outgoing_data)