Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

# vim: ft=python fileencoding=utf-8 sts=4 sw=4 et: 

 

# Copyright 2014-2018 Florian Bruhin (The Compiler) <mail@qutebrowser.org> 

# 

# This file is part of qutebrowser. 

# 

# qutebrowser is free software: you can redistribute it and/or modify 

# it under the terms of the GNU General Public License as published by 

# the Free Software Foundation, either version 3 of the License, or 

# (at your option) any later version. 

# 

# qutebrowser is distributed in the hope that it will be useful, 

# but WITHOUT ANY WARRANTY; without even the implied warranty of 

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

# GNU General Public License for more details. 

# 

# You should have received a copy of the GNU General Public License 

# along with qutebrowser. If not, see <http://www.gnu.org/licenses/>. 

 

"""Misc. utilities related to Qt. 

 

Module attributes: 

MAXVALS: A dictionary of C/Qt types (as string) mapped to their maximum 

value. 

MINVALS: A dictionary of C/Qt types (as string) mapped to their minimum 

value. 

""" 

 

 

import io 

import operator 

import contextlib 

 

import pkg_resources 

from PyQt5.QtCore import (qVersion, QEventLoop, QDataStream, QByteArray, 

QIODevice, QSaveFile, QT_VERSION_STR, 

PYQT_VERSION_STR) 

try: 

from PyQt5.QtWebKit import qWebKitVersion 

except ImportError: # pragma: no cover 

qWebKitVersion = None 

 

 

MAXVALS = { 

'int': 2 ** 31 - 1, 

'int64': 2 ** 63 - 1, 

} 

 

MINVALS = { 

'int': -(2 ** 31), 

'int64': -(2 ** 63), 

} 

 

 

class QtOSError(OSError): 

 

"""An OSError triggered by a QFileDevice. 

 

Attributes: 

qt_errno: The error attribute of the given QFileDevice, if applicable. 

""" 

 

def __init__(self, dev, msg=None): 

if msg is None: 

msg = dev.errorString() 

 

super().__init__(msg) 

 

try: 

self.qt_errno = dev.error() 

except AttributeError: 

self.qt_errno = None 

 

 

def version_check(version, exact=False, compiled=True): 

"""Check if the Qt runtime version is the version supplied or newer. 

 

Args: 

version: The version to check against. 

exact: if given, check with == instead of >= 

compiled: Set to False to not check the compiled version. 

""" 

# Catch code using the old API for this 

assert exact not in [operator.gt, operator.lt, operator.ge, operator.le, 

operator.eq], exact 

if compiled and exact: 

raise ValueError("Can't use compiled=True with exact=True!") 

 

parsed = pkg_resources.parse_version(version) 

op = operator.eq if exact else operator.ge 

result = op(pkg_resources.parse_version(qVersion()), parsed) 

if compiled and result: 

# qVersion() ==/>= parsed, now check if QT_VERSION_STR ==/>= parsed. 

result = op(pkg_resources.parse_version(QT_VERSION_STR), parsed) 

if compiled and result: 

# FInally, check PYQT_VERSION_STR as well. 

result = op(pkg_resources.parse_version(PYQT_VERSION_STR), parsed) 

return result 

 

 

def is_new_qtwebkit(): 

"""Check if the given version is a new QtWebKit.""" 

assert qWebKitVersion is not None 

return (pkg_resources.parse_version(qWebKitVersion()) > 

pkg_resources.parse_version('538.1')) 

 

 

def check_overflow(arg, ctype, fatal=True): 

"""Check if the given argument is in bounds for the given type. 

 

Args: 

arg: The argument to check 

ctype: The C/Qt type to check as a string. 

fatal: Whether to raise exceptions (True) or truncate values (False) 

 

Return 

The truncated argument if fatal=False 

The original argument if it's in bounds. 

""" 

maxval = MAXVALS[ctype] 

minval = MINVALS[ctype] 

if arg > maxval: 

if fatal: 

raise OverflowError(arg) 

else: 

return maxval 

elif arg < minval: 

if fatal: 

raise OverflowError(arg) 

else: 

return minval 

else: 

return arg 

 

 

def ensure_valid(obj): 

"""Ensure a Qt object with an .isValid() method is valid.""" 

if not obj.isValid(): 

raise QtValueError(obj) 

 

 

def check_qdatastream(stream): 

"""Check the status of a QDataStream and raise OSError if it's not ok.""" 

status_to_str = { 

QDataStream.Ok: "The data stream is operating normally.", 

QDataStream.ReadPastEnd: ("The data stream has read past the end of " 

"the data in the underlying device."), 

QDataStream.ReadCorruptData: "The data stream has read corrupt data.", 

QDataStream.WriteFailed: ("The data stream cannot write to the " 

"underlying device."), 

} 

if stream.status() != QDataStream.Ok: 

raise OSError(status_to_str[stream.status()]) 

 

 

def serialize(obj): 

"""Serialize an object into a QByteArray.""" 

data = QByteArray() 

stream = QDataStream(data, QIODevice.WriteOnly) 

serialize_stream(stream, obj) 

return data 

 

 

def deserialize(data, obj): 

"""Deserialize an object from a QByteArray.""" 

stream = QDataStream(data, QIODevice.ReadOnly) 

deserialize_stream(stream, obj) 

 

 

def serialize_stream(stream, obj): 

"""Serialize an object into a QDataStream.""" 

check_qdatastream(stream) 

stream << obj # pylint: disable=pointless-statement 

check_qdatastream(stream) 

 

 

def deserialize_stream(stream, obj): 

"""Deserialize a QDataStream into an object.""" 

check_qdatastream(stream) 

stream >> obj # pylint: disable=pointless-statement 

check_qdatastream(stream) 

 

 

@contextlib.contextmanager 

def savefile_open(filename, binary=False, encoding='utf-8'): 

"""Context manager to easily use a QSaveFile.""" 

f = QSaveFile(filename) 

cancelled = False 

try: 

open_ok = f.open(QIODevice.WriteOnly) 

if not open_ok: 

raise QtOSError(f) 

if binary: 

new_f = PyQIODevice(f) 

else: 

new_f = io.TextIOWrapper(PyQIODevice(f), encoding=encoding) 

yield new_f 

new_f.flush() 

except: 

f.cancelWriting() 

cancelled = True 

raise 

finally: 

commit_ok = f.commit() 

if not commit_ok and not cancelled: 

raise QtOSError(f, msg="Commit failed!") 

 

 

class PyQIODevice(io.BufferedIOBase): 

 

"""Wrapper for a QIODevice which provides a python interface. 

 

Attributes: 

dev: The underlying QIODevice. 

""" 

 

def __init__(self, dev): 

super().__init__() 

self.dev = dev 

 

def __len__(self): 

return self.dev.size() 

 

def _check_open(self): 

"""Check if the device is open, raise ValueError if not.""" 

if not self.dev.isOpen(): 

raise ValueError("IO operation on closed device!") 

 

def _check_random(self): 

"""Check if the device supports random access, raise OSError if not.""" 

if not self.seekable(): 

raise OSError("Random access not allowed!") 

 

def _check_readable(self): 

"""Check if the device is readable, raise OSError if not.""" 

if not self.dev.isReadable(): 

raise OSError("Trying to read unreadable file!") 

 

def _check_writable(self): 

"""Check if the device is writable, raise OSError if not.""" 

if not self.writable(): 

raise OSError("Trying to write to unwritable file!") 

 

def open(self, mode): 

"""Open the underlying device and ensure opening succeeded. 

 

Raises OSError if opening failed. 

 

Args: 

mode: QIODevice::OpenMode flags. 

 

Return: 

A contextlib.closing() object so this can be used as 

contextmanager. 

""" 

ok = self.dev.open(mode) 

if not ok: 

raise QtOSError(self.dev) 

return contextlib.closing(self) 

 

def close(self): 

"""Close the underlying device.""" 

self.dev.close() 

 

def fileno(self): 

raise io.UnsupportedOperation 

 

def seek(self, offset, whence=io.SEEK_SET): 

self._check_open() 

self._check_random() 

if whence == io.SEEK_SET: 

ok = self.dev.seek(offset) 

elif whence == io.SEEK_CUR: 

ok = self.dev.seek(self.tell() + offset) 

elif whence == io.SEEK_END: 

ok = self.dev.seek(len(self) + offset) 

else: 

raise io.UnsupportedOperation("whence = {} is not " 

"supported!".format(whence)) 

if not ok: 

raise QtOSError(self.dev, msg="seek failed!") 

 

def truncate(self, size=None): # pylint: disable=unused-argument 

raise io.UnsupportedOperation 

 

@property 

def closed(self): 

return not self.dev.isOpen() 

 

def flush(self): 

self._check_open() 

self.dev.waitForBytesWritten(-1) 

 

def isatty(self): 

self._check_open() 

return False 

 

def readable(self): 

return self.dev.isReadable() 

 

def readline(self, size=-1): 

self._check_open() 

self._check_readable() 

 

if size < 0: 

qt_size = 0 # no maximum size 

elif size == 0: 

return QByteArray() 

else: 

qt_size = size + 1 # Qt also counts the NUL byte 

 

if self.dev.canReadLine(): 

buf = self.dev.readLine(qt_size) 

else: 

if size < 0: 

buf = self.dev.readAll() 

else: 

buf = self.dev.read(size) 

 

if buf is None: 

raise QtOSError(self.dev) 

return buf 

 

def seekable(self): 

return not self.dev.isSequential() 

 

def tell(self): 

self._check_open() 

self._check_random() 

return self.dev.pos() 

 

def writable(self): 

return self.dev.isWritable() 

 

def write(self, b): 

self._check_open() 

self._check_writable() 

num = self.dev.write(b) 

if num == -1 or num < len(b): 

raise QtOSError(self.dev) 

return num 

 

def read(self, size=-1): 

self._check_open() 

self._check_readable() 

if size < 0: 

buf = self.dev.readAll() 

else: 

buf = self.dev.read(size) 

if buf is None: 

raise QtOSError(self.dev) 

return buf 

 

 

class QtValueError(ValueError): 

 

"""Exception which gets raised by ensure_valid.""" 

 

def __init__(self, obj): 

try: 

self.reason = obj.errorString() 

except AttributeError: 

self.reason = None 

err = "{} is not valid".format(obj) 

if self.reason: 

err += ": {}".format(self.reason) 

super().__init__(err) 

 

 

class EventLoop(QEventLoop): 

 

"""A thin wrapper around QEventLoop. 

 

Raises an exception when doing exec_() multiple times. 

""" 

 

def __init__(self, parent=None): 

super().__init__(parent) 

self._executing = False 

 

def exec_(self, flags=QEventLoop.AllEvents): 

"""Override exec_ to raise an exception when re-running.""" 

if self._executing: 

raise AssertionError("Eventloop is already running!") 

self._executing = True 

super().exec_(flags) 

self._executing = False