Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

"""Test utilities. 

 

.. warning:: This module is not part of the public API. 

 

""" 

import os 

import pkg_resources 

import shutil 

import tempfile 

import unittest 

import sys 

import warnings 

from multiprocessing import Process, Event 

 

from cryptography.hazmat.backends import default_backend 

from cryptography.hazmat.primitives import serialization 

import mock 

import OpenSSL 

import josepy as jose 

import six 

from six.moves import reload_module # pylint: disable=import-error 

 

from certbot import constants 

from certbot import interfaces 

from certbot import storage 

from certbot import configuration 

from certbot import lock 

from certbot import util 

 

from certbot.display import util as display_util 

 

 

def vector_path(*names): 

"""Path to a test vector.""" 

return pkg_resources.resource_filename( 

__name__, os.path.join('testdata', *names)) 

 

 

def load_vector(*names): 

"""Load contents of a test vector.""" 

# luckily, resource_string opens file in binary mode 

data = pkg_resources.resource_string( 

__name__, os.path.join('testdata', *names)) 

# Try at most to convert CRLF to LF when data is text 

try: 

return data.decode().replace('\r\n', '\n').encode() 

except ValueError: 

# Failed to process the file with standard encoding. 

# Most likely not a text file, return its bytes untouched. 

return data 

 

 

def _guess_loader(filename, loader_pem, loader_der): 

_, ext = os.path.splitext(filename) 

if ext.lower() == '.pem': 

return loader_pem 

elif ext.lower() == '.der': 

return loader_der 

else: # pragma: no cover 

raise ValueError("Loader could not be recognized based on extension") 

 

 

def load_cert(*names): 

"""Load certificate.""" 

loader = _guess_loader( 

names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1) 

return OpenSSL.crypto.load_certificate(loader, load_vector(*names)) 

 

 

def load_csr(*names): 

"""Load certificate request.""" 

loader = _guess_loader( 

names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1) 

return OpenSSL.crypto.load_certificate_request(loader, load_vector(*names)) 

 

 

def load_comparable_csr(*names): 

"""Load ComparableX509 certificate request.""" 

return jose.ComparableX509(load_csr(*names)) 

 

 

def load_rsa_private_key(*names): 

"""Load RSA private key.""" 

loader = _guess_loader(names[-1], serialization.load_pem_private_key, 

serialization.load_der_private_key) 

return jose.ComparableRSAKey(loader( 

load_vector(*names), password=None, backend=default_backend())) 

 

 

def load_pyopenssl_private_key(*names): 

"""Load pyOpenSSL private key.""" 

loader = _guess_loader( 

names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1) 

return OpenSSL.crypto.load_privatekey(loader, load_vector(*names)) 

 

 

def skip_unless(condition, reason): # pragma: no cover 

"""Skip tests unless a condition holds. 

 

This implements the basic functionality of unittest.skipUnless 

which is only available on Python 2.7+. 

 

:param bool condition: If ``False``, the test will be skipped 

:param str reason: the reason for skipping the test 

 

:rtype: callable 

:returns: decorator that hides tests unless condition is ``True`` 

 

""" 

if hasattr(unittest, "skipUnless"): 

return unittest.skipUnless(condition, reason) 

elif condition: 

return lambda cls: cls 

else: 

return lambda cls: None 

 

 

def make_lineage(config_dir, testfile): 

"""Creates a lineage defined by testfile. 

 

This creates the archive, live, and renewal directories if 

necessary and creates a simple lineage. 

 

:param str config_dir: path to the configuration directory 

:param str testfile: configuration file to base the lineage on 

 

:returns: path to the renewal conf file for the created lineage 

:rtype: str 

 

""" 

lineage_name = testfile[:-len('.conf')] 

 

conf_dir = os.path.join( 

config_dir, constants.RENEWAL_CONFIGS_DIR) 

archive_dir = os.path.join( 

config_dir, constants.ARCHIVE_DIR, lineage_name) 

live_dir = os.path.join( 

config_dir, constants.LIVE_DIR, lineage_name) 

 

for directory in (archive_dir, conf_dir, live_dir,): 

if not os.path.exists(directory): 

os.makedirs(directory) 

 

sample_archive = vector_path('sample-archive') 

for kind in os.listdir(sample_archive): 

shutil.copyfile(os.path.join(sample_archive, kind), 

os.path.join(archive_dir, kind)) 

 

for kind in storage.ALL_FOUR: 

os.symlink(os.path.join(archive_dir, '{0}1.pem'.format(kind)), 

os.path.join(live_dir, '{0}.pem'.format(kind))) 

 

conf_path = os.path.join(config_dir, conf_dir, testfile) 

with open(vector_path(testfile)) as src: 

with open(conf_path, 'w') as dst: 

dst.writelines( 

line.replace('MAGICDIR', config_dir) for line in src) 

 

return conf_path 

 

 

def patch_get_utility(target='zope.component.getUtility'): 

"""Patch zope.component.getUtility to use a special mock IDisplay. 

 

The mock IDisplay works like a regular mock object, except it also 

also asserts that methods are called with valid arguments. 

 

:param str target: path to patch 

 

:returns: mock zope.component.getUtility 

:rtype: mock.MagicMock 

 

""" 

return mock.patch(target, new_callable=_create_get_utility_mock) 

 

 

def patch_get_utility_with_stdout(target='zope.component.getUtility', 

stdout=None): 

"""Patch zope.component.getUtility to use a special mock IDisplay. 

 

The mock IDisplay works like a regular mock object, except it also 

also asserts that methods are called with valid arguments. 

 

The `message` argument passed to the IDisplay methods is passed to 

stdout's write method. 

 

:param str target: path to patch 

:param object stdout: object to write standard output to; it is 

expected to have a `write` method 

 

:returns: mock zope.component.getUtility 

:rtype: mock.MagicMock 

 

""" 

stdout = stdout if stdout else six.StringIO() 

 

freezable_mock = _create_get_utility_mock_with_stdout(stdout) 

return mock.patch(target, new=freezable_mock) 

 

 

class FreezableMock(object): 

"""Mock object with the ability to freeze attributes. 

 

This class works like a regular mock.MagicMock object, except 

attributes and behavior set before the object is frozen cannot 

be changed during tests. 

 

If a func argument is provided to the constructor, this function 

is called first when an instance of FreezableMock is called, 

followed by the usual behavior defined by MagicMock. The return 

value of func is ignored. 

 

""" 

def __init__(self, frozen=False, func=None, return_value=mock.sentinel.DEFAULT): 

self._frozen_set = set() if frozen else {'freeze', } 

self._func = func 

self._mock = mock.MagicMock() 

if return_value != mock.sentinel.DEFAULT: 

self.return_value = return_value 

self._frozen = frozen 

 

def freeze(self): 

"""Freeze object preventing further changes.""" 

self._frozen = True 

 

def __call__(self, *args, **kwargs): 

if self._func is not None: 

self._func(*args, **kwargs) 

return self._mock(*args, **kwargs) 

 

def __getattribute__(self, name): 

if name == '_frozen': 

try: 

return object.__getattribute__(self, name) 

except AttributeError: 

return False 

elif name in ('return_value', 'side_effect',): 

return getattr(object.__getattribute__(self, '_mock'), name) 

elif name == '_frozen_set' or name in self._frozen_set: 

return object.__getattribute__(self, name) 

else: 

return getattr(object.__getattribute__(self, '_mock'), name) 

 

def __setattr__(self, name, value): 

""" Before it is frozen, attributes are set on the FreezableMock 

instance and added to the _frozen_set. Attributes in the _frozen_set 

cannot be changed after the FreezableMock is frozen. In this case, 

they are set on the underlying _mock. 

 

In cases of return_value and side_effect, these attributes are always 

passed through to the instance's _mock and added to the _frozen_set 

before the object is frozen. 

 

""" 

if self._frozen: 

if name in self._frozen_set: 

raise AttributeError('Cannot change frozen attribute ' + name) 

else: 

return setattr(self._mock, name, value) 

 

if name != '_frozen_set': 

self._frozen_set.add(name) 

 

if name in ('return_value', 'side_effect'): 

return setattr(self._mock, name, value) 

 

else: 

return object.__setattr__(self, name, value) 

 

 

def _create_get_utility_mock(): 

display = FreezableMock() 

for name in interfaces.IDisplay.names(): # pylint: disable=no-member 

if name != 'notification': 

frozen_mock = FreezableMock(frozen=True, func=_assert_valid_call) 

setattr(display, name, frozen_mock) 

display.freeze() 

return FreezableMock(frozen=True, return_value=display) 

 

 

def _create_get_utility_mock_with_stdout(stdout): 

def _write_msg(message, *unused_args, **unused_kwargs): 

"""Write to message to stdout. 

""" 

if message: 

stdout.write(message) 

 

def mock_method(*args, **kwargs): 

""" 

Mock function for IDisplay methods. 

""" 

_assert_valid_call(args, kwargs) 

_write_msg(*args, **kwargs) 

 

 

display = FreezableMock() 

for name in interfaces.IDisplay.names(): # pylint: disable=no-member 

if name == 'notification': 

frozen_mock = FreezableMock(frozen=True, 

func=_write_msg) 

setattr(display, name, frozen_mock) 

else: 

frozen_mock = FreezableMock(frozen=True, 

func=mock_method) 

setattr(display, name, frozen_mock) 

display.freeze() 

 

return FreezableMock(frozen=True, return_value=display) 

 

 

def _assert_valid_call(*args, **kwargs): 

assert_args = [args[0] if args else kwargs['message']] 

 

assert_kwargs = {} 

assert_kwargs['default'] = kwargs.get('default', None) 

assert_kwargs['cli_flag'] = kwargs.get('cli_flag', None) 

assert_kwargs['force_interactive'] = kwargs.get('force_interactive', False) 

 

# pylint: disable=star-args 

display_util.assert_valid_call(*assert_args, **assert_kwargs) 

 

 

class TempDirTestCase(unittest.TestCase): 

"""Base test class which sets up and tears down a temporary directory""" 

 

def setUp(self): 

"""Execute before test""" 

self.tempdir = tempfile.mkdtemp() 

 

def tearDown(self): 

"""Execute after test""" 

# On Windows we have various files which are not correctly closed at the time of tearDown. 

# For know, we log them until a proper file close handling is written. 

# Useful for development only, so no warning when we are on a CI process. 

def onerror_handler(_, path, excinfo): 

"""On error handler""" 

if not os.environ.get('APPVEYOR'): # pragma: no cover 

message = ('Following error occurred when deleting the tempdir {0}' 

' for path {1} during tearDown process: {2}' 

.format(self.tempdir, path, str(excinfo))) 

warnings.warn(message) 

shutil.rmtree(self.tempdir, onerror=onerror_handler) 

 

 

class ConfigTestCase(TempDirTestCase): 

"""Test class which sets up a NamespaceConfig object. 

 

""" 

def setUp(self): 

super(ConfigTestCase, self).setUp() 

self.config = configuration.NamespaceConfig( 

mock.MagicMock(**constants.CLI_DEFAULTS) 

) 

self.config.verb = "certonly" 

self.config.config_dir = os.path.join(self.tempdir, 'config') 

self.config.work_dir = os.path.join(self.tempdir, 'work') 

self.config.logs_dir = os.path.join(self.tempdir, 'logs') 

self.config.cert_path = constants.CLI_DEFAULTS['auth_cert_path'] 

self.config.fullchain_path = constants.CLI_DEFAULTS['auth_chain_path'] 

self.config.chain_path = constants.CLI_DEFAULTS['auth_chain_path'] 

self.config.server = "https://example.com" 

 

 

def _handle_lock(event_in, event_out, path): 

""" 

Acquire a file lock on given path, then wait to release it. This worker is coordinated 

using events to signal when the lock should be acquired and released. 

:param multiprocessing.Event event_in: event object to signal when to release the lock 

:param multiprocessing.Event event_out: event object to signal when the lock is acquired 

:param path: the path to lock 

""" 

if os.path.isdir(path): 

my_lock = lock.lock_dir(path) 

else: 

my_lock = lock.LockFile(path) 

try: 

event_out.set() 

assert event_in.wait(timeout=20), 'Timeout while waiting to release the lock.' 

finally: 

my_lock.release() 

 

 

def lock_and_call(callback, path_to_lock): 

""" 

Grab a lock on path_to_lock from a foreign process then execute the callback. 

:param callable callback: object to call after acquiring the lock 

:param str path_to_lock: path to file or directory to lock 

""" 

# Reload certbot.util module to reset internal _LOCKS dictionary. 

reload_module(util) 

 

emit_event = Event() 

receive_event = Event() 

process = Process(target=_handle_lock, args=(emit_event, receive_event, path_to_lock)) 

process.start() 

 

# Wait confirmation that lock is acquired 

assert receive_event.wait(timeout=10), 'Timeout while waiting to acquire the lock.' 

# Execute the callback 

callback() 

# Trigger unlock from foreign process 

emit_event.set() 

 

# Wait for process termination 

process.join(timeout=10) 

assert process.exitcode == 0 

 

 

def skip_on_windows(reason): 

"""Decorator to skip permanently a test on Windows. A reason is required.""" 

def wrapper(function): 

"""Wrapped version""" 

return unittest.skipIf(sys.platform == 'win32', reason)(function) 

return wrapper 

 

 

def broken_on_windows(function): 

"""Decorator to skip temporarily a broken test on Windows.""" 

reason = 'Test is broken and ignored on windows but should be fixed.' 

return unittest.skipIf( 

sys.platform == 'win32' 

and os.environ.get('SKIP_BROKEN_TESTS_ON_WINDOWS', 'true') == 'true', 

reason)(function) 

 

 

def temp_join(path): 

""" 

Return the given path joined to the tempdir path for the current platform 

Eg.: 'cert' => /tmp/cert (Linux) or 'C:\\Users\\currentuser\\AppData\\Temp\\cert' (Windows) 

""" 

return os.path.join(tempfile.gettempdir(), path)