Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

# vim: ft=python fileencoding=utf-8 sts=4 sw=4 et: 

 

# Copyright 2016-2018 Florian Bruhin (The Compiler) <mail@qutebrowser.org> 

# 

# This file is part of qutebrowser. 

# 

# qutebrowser is free software: you can redistribute it and/or modify 

# it under the terms of the GNU General Public License as published by 

# the Free Software Foundation, either version 3 of the License, or 

# (at your option) any later version. 

# 

# qutebrowser is distributed in the hope that it will be useful, 

# but WITHOUT ANY WARRANTY; without even the implied warranty of 

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

# GNU General Public License for more details. 

# 

# You should have received a copy of the GNU General Public License 

# along with qutebrowser. If not, see <http://www.gnu.org/licenses/>. 

 

"""Evaluation of PAC scripts.""" 

 

import sys 

import functools 

 

from PyQt5.QtCore import QObject, pyqtSignal, pyqtSlot, QUrl 

from PyQt5.QtNetwork import (QNetworkProxy, QNetworkRequest, QHostInfo, 

QNetworkReply, QNetworkAccessManager, 

QHostAddress) 

from PyQt5.QtQml import QJSEngine, QJSValue 

 

from qutebrowser.utils import log, utils, qtutils 

 

 

class ParseProxyError(Exception): 

 

"""Error while parsing PAC result string.""" 

 

pass 

 

 

class EvalProxyError(Exception): 

 

"""Error while evaluating PAC script.""" 

 

pass 

 

 

def _js_slot(*args): 

"""Wrap a methods as a JavaScript function. 

 

Register a PACContext method as a JavaScript function, and catch 

exceptions returning them as JavaScript Error objects. 

 

Args: 

args: Types of method arguments. 

 

Return: Wrapped method. 

""" 

def _decorator(method): 

@functools.wraps(method) 

def new_method(self, *args, **kwargs): 

"""Call the underlying function.""" 

try: 

return method(self, *args, **kwargs) 

except: 

e = str(sys.exc_info()[0]) 

log.network.exception("PAC evaluation error") 

# pylint: disable=protected-access 

return self._error_con.callAsConstructor([e]) 

# pylint: enable=protected-access 

return pyqtSlot(*args, result=QJSValue)(new_method) 

return _decorator 

 

 

class _PACContext(QObject): 

 

"""Implementation of PAC API functions that require native calls. 

 

See https://developer.mozilla.org/en-US/docs/Mozilla/Projects/Necko/Proxy_Auto-Configuration_(PAC)_file 

""" 

 

JS_DEFINITIONS = """ 

function dnsResolve(host) { 

return PAC.dnsResolve(host); 

} 

 

function myIpAddress() { 

return PAC.myIpAddress(); 

} 

""" 

 

def __init__(self, engine): 

"""Create a new PAC API implementation instance. 

 

Args: 

engine: QJSEngine which is used for running PAC. 

""" 

super().__init__(parent=engine) 

self._engine = engine 

self._error_con = engine.globalObject().property("Error") 

 

@_js_slot(str) 

def dnsResolve(self, host): 

"""Resolve a DNS hostname. 

 

Resolves the given DNS hostname into an IP address, and returns it 

in the dot-separated format as a string. 

 

Args: 

host: hostname to resolve. 

""" 

ips = QHostInfo.fromName(host) 

if ips.error() != QHostInfo.NoError or not ips.addresses(): 

err_f = "Failed to resolve host during PAC evaluation: {}" 

log.network.info(err_f.format(host)) 

return QJSValue(QJSValue.NullValue) 

else: 

return ips.addresses()[0].toString() 

 

@_js_slot() 

def myIpAddress(self): 

"""Get host IP address. 

 

Return the server IP address of the current machine, as a string in 

the dot-separated integer format. 

""" 

return QHostAddress(QHostAddress.LocalHost).toString() 

 

 

class PACResolver: 

 

"""Evaluate PAC script files and resolve proxies.""" 

 

@staticmethod 

def _parse_proxy_host(host_str): 

host, _colon, port_str = host_str.partition(':') 

try: 

port = int(port_str) 

except ValueError: 

raise ParseProxyError("Invalid port number") 

return (host, port) 

 

@staticmethod 

def _parse_proxy_entry(proxy_str): 

"""Parse one proxy string entry, as described in PAC specification.""" 

config = [c.strip() for c in proxy_str.split(' ') if c] 

if not config: 

raise ParseProxyError("Empty proxy entry") 

elif config[0] == "DIRECT": 

if len(config) != 1: 

raise ParseProxyError("Invalid number of parameters for " + 

"DIRECT") 

return QNetworkProxy(QNetworkProxy.NoProxy) 

elif config[0] == "PROXY": 

if len(config) != 2: 

raise ParseProxyError("Invalid number of parameters for PROXY") 

host, port = PACResolver._parse_proxy_host(config[1]) 

return QNetworkProxy(QNetworkProxy.HttpProxy, host, port) 

elif config[0] in ["SOCKS", "SOCKS5"]: 

if len(config) != 2: 

raise ParseProxyError("Invalid number of parameters for SOCKS") 

host, port = PACResolver._parse_proxy_host(config[1]) 

return QNetworkProxy(QNetworkProxy.Socks5Proxy, host, port) 

else: 

err = "Unknown proxy type: {}" 

raise ParseProxyError(err.format(config[0])) 

 

@staticmethod 

def _parse_proxy_string(proxy_str): 

proxies = proxy_str.split(';') 

return [PACResolver._parse_proxy_entry(x) for x in proxies] 

 

def _evaluate(self, js_code, js_file): 

ret = self._engine.evaluate(js_code, js_file) 

if ret.isError(): 

err = "JavaScript error while evaluating PAC file: {}" 

raise EvalProxyError(err.format(ret.toString())) 

 

def __init__(self, pac_str): 

"""Create a PAC resolver. 

 

Args: 

pac_str: JavaScript code containing PAC resolver. 

""" 

self._engine = QJSEngine() 

 

self._ctx = _PACContext(self._engine) 

self._engine.globalObject().setProperty( 

"PAC", self._engine.newQObject(self._ctx)) 

self._evaluate(_PACContext.JS_DEFINITIONS, "pac_js_definitions") 

self._evaluate(utils.read_file("javascript/pac_utils.js"), "pac_utils") 

proxy_config = self._engine.newObject() 

proxy_config.setProperty("bindings", self._engine.newObject()) 

self._engine.globalObject().setProperty("ProxyConfig", proxy_config) 

 

self._evaluate(pac_str, "pac") 

global_js_object = self._engine.globalObject() 

self._resolver = global_js_object.property("FindProxyForURL") 

if not self._resolver.isCallable(): 

err = "Cannot resolve FindProxyForURL function, got '{}' instead" 

raise EvalProxyError(err.format(self._resolver.toString())) 

 

def resolve(self, query, from_file=False): 

"""Resolve a proxy via PAC. 

 

Args: 

query: QNetworkProxyQuery. 

from_file: Whether the proxy info is coming from a file. 

 

Return: 

A list of QNetworkProxy objects in order of preference. 

""" 

if from_file: 

string_flags = QUrl.PrettyDecoded 

else: 

string_flags = QUrl.RemoveUserInfo 

if query.url().scheme() == 'https': 

string_flags |= QUrl.RemovePath | QUrl.RemoveQuery 

 

result = self._resolver.call([query.url().toString(string_flags), 

query.peerHostName()]) 

result_str = result.toString() 

if not result.isString(): 

err = "Got strange value from FindProxyForURL: '{}'" 

raise EvalProxyError(err.format(result_str)) 

return self._parse_proxy_string(result_str) 

 

 

class PACFetcher(QObject): 

 

"""Asynchronous fetcher of PAC files.""" 

 

finished = pyqtSignal() 

 

def __init__(self, url, parent=None): 

"""Resolve a PAC proxy from URL. 

 

Args: 

url: QUrl of a PAC proxy. 

""" 

super().__init__(parent) 

 

pac_prefix = "pac+" 

 

assert url.scheme().startswith(pac_prefix) 

url.setScheme(url.scheme()[len(pac_prefix):]) 

 

self._pac_url = url 

self._manager = QNetworkAccessManager() 

self._manager.setProxy(QNetworkProxy(QNetworkProxy.NoProxy)) 

self._pac = None 

self._error_message = None 

self._reply = None 

 

def __eq__(self, other): 

# pylint: disable=protected-access 

return self._pac_url == other._pac_url 

 

def __repr__(self): 

return utils.get_repr(self, url=self._pac_url, constructor=True) 

 

def fetch(self): 

"""Fetch the proxy from the remote URL.""" 

self._reply = self._manager.get(QNetworkRequest(self._pac_url)) 

self._reply.finished.connect(self._finish) 

 

@pyqtSlot() 

def _finish(self): 

if self._reply.error() != QNetworkReply.NoError: 

error = "Can't fetch PAC file from URL, error code {}: {}" 

self._error_message = error.format( 

self._reply.error(), self._reply.errorString()) 

log.network.error(self._error_message) 

else: 

try: 

pacscript = bytes(self._reply.readAll()).decode("utf-8") 

except UnicodeError as e: 

error = "Invalid encoding of a PAC file: {}" 

self._error_message = error.format(e) 

log.network.exception(self._error_message) 

try: 

self._pac = PACResolver(pacscript) 

log.network.debug("Successfully evaluated PAC file.") 

except EvalProxyError as e: 

error = "Error in PAC evaluation: {}" 

self._error_message = error.format(e) 

log.network.exception(self._error_message) 

self._manager = None 

self._reply = None 

self.finished.emit() 

 

def _wait(self): 

"""Wait until a reply from the remote server is received.""" 

if self._manager is not None: 

loop = qtutils.EventLoop() 

self.finished.connect(loop.quit) 

loop.exec_() 

 

def fetch_error(self): 

"""Check if PAC script is successfully fetched. 

 

Return None iff PAC script is downloaded and evaluated successfully, 

error string otherwise. 

""" 

self._wait() 

return self._error_message 

 

def resolve(self, query): 

"""Resolve a query via PAC. 

 

Args: QNetworkProxyQuery. 

 

Return a list of QNetworkProxy objects in order of preference. 

""" 

self._wait() 

from_file = self._pac_url.scheme() == 'file' 

try: 

return self._pac.resolve(query, from_file=from_file) 

except (EvalProxyError, ParseProxyError) as e: 

log.network.exception("Error in PAC resolution: {}.".format(e)) 

# .invalid is guaranteed to be inaccessible in RFC 6761. 

# Port 9 is for DISCARD protocol -- DISCARD servers act like 

# /dev/null. 

# Later NetworkManager.createRequest will detect this and display 

# an error message. 

error_host = "pac-resolve-error.qutebrowser.invalid" 

return [QNetworkProxy(QNetworkProxy.HttpProxy, error_host, 9)]