Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

"""Utilities for plugins discovery and selection.""" 

import collections 

import itertools 

import logging 

import pkg_resources 

import six 

 

from collections import OrderedDict 

 

import zope.interface 

import zope.interface.verify 

 

from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module 

from certbot import constants 

from certbot import errors 

from certbot import interfaces 

 

 

logger = logging.getLogger(__name__) 

 

 

class PluginEntryPoint(object): 

"""Plugin entry point.""" 

 

PREFIX_FREE_DISTRIBUTIONS = [ 

"certbot", 

"certbot-apache", 

"certbot-dns-cloudflare", 

"certbot-dns-cloudxns", 

"certbot-dns-digitalocean", 

"certbot-dns-dnsimple", 

"certbot-dns-dnsmadeeasy", 

"certbot-dns-gehirn", 

"certbot-dns-google", 

"certbot-dns-linode", 

"certbot-dns-luadns", 

"certbot-dns-nsone", 

"certbot-dns-ovh", 

"certbot-dns-rfc2136", 

"certbot-dns-route53", 

"certbot-dns-sakuracloud", 

"certbot-nginx", 

"certbot-postfix", 

] 

"""Distributions for which prefix will be omitted.""" 

 

# this object is mutable, don't allow it to be hashed! 

__hash__ = None # type: ignore 

 

def __init__(self, entry_point): 

self.name = self.entry_point_to_plugin_name(entry_point) 

self.plugin_cls = entry_point.load() 

self.entry_point = entry_point 

self._initialized = None 

self._prepared = None 

 

@classmethod 

def entry_point_to_plugin_name(cls, entry_point): 

"""Unique plugin name for an ``entry_point``""" 

if entry_point.dist.key in cls.PREFIX_FREE_DISTRIBUTIONS: 

return entry_point.name 

return entry_point.dist.key + ":" + entry_point.name 

 

@property 

def description(self): 

"""Description of the plugin.""" 

return self.plugin_cls.description 

 

@property 

def description_with_name(self): 

"""Description with name. Handy for UI.""" 

return "{0} ({1})".format(self.description, self.name) 

 

@property 

def long_description(self): 

"""Long description of the plugin.""" 

try: 

return self.plugin_cls.long_description 

except AttributeError: 

return self.description 

 

@property 

def hidden(self): 

"""Should this plugin be hidden from UI?""" 

return getattr(self.plugin_cls, "hidden", False) 

 

def ifaces(self, *ifaces_groups): 

"""Does plugin implements specified interface groups?""" 

return not ifaces_groups or any( 

all(iface.implementedBy(self.plugin_cls) 

for iface in ifaces) 

for ifaces in ifaces_groups) 

 

@property 

def initialized(self): 

"""Has the plugin been initialized already?""" 

return self._initialized is not None 

 

def init(self, config=None): 

"""Memoized plugin initialization.""" 

if not self.initialized: 

self.entry_point.require() # fetch extras! 

self._initialized = self.plugin_cls(config, self.name) 

return self._initialized 

 

def verify(self, ifaces): 

"""Verify that the plugin conforms to the specified interfaces.""" 

assert self.initialized 

for iface in ifaces: # zope.interface.providedBy(plugin) 

try: 

zope.interface.verify.verifyObject(iface, self.init()) 

except zope.interface.exceptions.BrokenImplementation as error: 

if iface.implementedBy(self.plugin_cls): 

logger.debug( 

"%s implements %s but object does not verify: %s", 

self.plugin_cls, iface.__name__, error, exc_info=True) 

return False 

return True 

 

@property 

def prepared(self): 

"""Has the plugin been prepared already?""" 

if not self.initialized: 

logger.debug(".prepared called on uninitialized %r", self) 

return self._prepared is not None 

 

def prepare(self): 

"""Memoized plugin preparation.""" 

assert self.initialized 

if self._prepared is None: 

try: 

self._initialized.prepare() 

except errors.MisconfigurationError as error: 

logger.debug("Misconfigured %r: %s", self, error, exc_info=True) 

self._prepared = error 

except errors.NoInstallationError as error: 

logger.debug( 

"No installation (%r): %s", self, error, exc_info=True) 

self._prepared = error 

except errors.PluginError as error: 

logger.debug("Other error:(%r): %s", self, error, exc_info=True) 

self._prepared = error 

else: 

self._prepared = True 

return self._prepared 

 

@property 

def misconfigured(self): 

"""Is plugin misconfigured?""" 

return isinstance(self._prepared, errors.MisconfigurationError) 

 

@property 

def problem(self): 

"""Return the Exception raised during plugin setup, or None if all is well""" 

if isinstance(self._prepared, Exception): 

return self._prepared 

return None 

 

@property 

def available(self): 

"""Is plugin available, i.e. prepared or misconfigured?""" 

return self._prepared is True or self.misconfigured 

 

def __repr__(self): 

return "PluginEntryPoint#{0}".format(self.name) 

 

def __str__(self): 

lines = [ 

"* {0}".format(self.name), 

"Description: {0}".format(self.plugin_cls.description), 

"Interfaces: {0}".format(", ".join( 

iface.__name__ for iface in zope.interface.implementedBy( 

self.plugin_cls))), 

"Entry point: {0}".format(self.entry_point), 

] 

 

if self.initialized: 

lines.append("Initialized: {0}".format(self.init())) 

if self.prepared: 

lines.append("Prep: {0}".format(self.prepare())) 

 

return "\n".join(lines) 

 

 

class PluginsRegistry(collections.Mapping): 

"""Plugins registry.""" 

 

def __init__(self, plugins): 

# plugins are sorted so the same order is used between runs. 

# This prevents deadlock caused by plugins acquiring a lock 

# and ensures at least one concurrent Certbot instance will run 

# successfully. 

self._plugins = OrderedDict(sorted(six.iteritems(plugins))) 

 

@classmethod 

def find_all(cls): 

"""Find plugins using setuptools entry points.""" 

plugins = {} # type: Dict[str, PluginEntryPoint] 

# pylint: disable=not-callable 

entry_points = itertools.chain( 

pkg_resources.iter_entry_points( 

constants.SETUPTOOLS_PLUGINS_ENTRY_POINT), 

pkg_resources.iter_entry_points( 

constants.OLD_SETUPTOOLS_PLUGINS_ENTRY_POINT),) 

for entry_point in entry_points: 

plugin_ep = PluginEntryPoint(entry_point) 

assert plugin_ep.name not in plugins, ( 

"PREFIX_FREE_DISTRIBUTIONS messed up") 

# providedBy | pylint: disable=no-member 

if interfaces.IPluginFactory.providedBy(plugin_ep.plugin_cls): 

plugins[plugin_ep.name] = plugin_ep 

else: # pragma: no cover 

logger.warning( 

"%r does not provide IPluginFactory, skipping", plugin_ep) 

return cls(plugins) 

 

def __getitem__(self, name): 

return self._plugins[name] 

 

def __iter__(self): 

return iter(self._plugins) 

 

def __len__(self): 

return len(self._plugins) 

 

def init(self, config): 

"""Initialize all plugins in the registry.""" 

return [plugin_ep.init(config) for plugin_ep 

in six.itervalues(self._plugins)] 

 

def filter(self, pred): 

"""Filter plugins based on predicate.""" 

return type(self)(dict((name, plugin_ep) for name, plugin_ep 

in six.iteritems(self._plugins) if pred(plugin_ep))) 

 

def visible(self): 

"""Filter plugins based on visibility.""" 

return self.filter(lambda plugin_ep: not plugin_ep.hidden) 

 

def ifaces(self, *ifaces_groups): 

"""Filter plugins based on interfaces.""" 

# pylint: disable=star-args 

return self.filter(lambda p_ep: p_ep.ifaces(*ifaces_groups)) 

 

def verify(self, ifaces): 

"""Filter plugins based on verification.""" 

return self.filter(lambda p_ep: p_ep.verify(ifaces)) 

 

def prepare(self): 

"""Prepare all plugins in the registry.""" 

return [plugin_ep.prepare() for plugin_ep in six.itervalues(self._plugins)] 

 

def available(self): 

"""Filter plugins based on availability.""" 

return self.filter(lambda p_ep: p_ep.available) 

# successfully prepared + misconfigured 

 

def find_init(self, plugin): 

"""Find an initialized plugin. 

 

This is particularly useful for finding a name for the plugin 

(although `.IPluginFactory.__call__` takes ``name`` as one of 

the arguments, ``IPlugin.name`` is not part of the interface):: 

 

# plugin is an instance providing IPlugin, initialized 

# somewhere else in the code 

plugin_registry.find_init(plugin).name 

 

Returns ``None`` if ``plugin`` is not found in the registry. 

 

""" 

# use list instead of set because PluginEntryPoint is not hashable 

candidates = [plugin_ep for plugin_ep in six.itervalues(self._plugins) 

if plugin_ep.initialized and plugin_ep.init() is plugin] 

assert len(candidates) <= 1 

if candidates: 

return candidates[0] 

else: 

return None 

 

def __repr__(self): 

return "{0}({1})".format( 

self.__class__.__name__, ','.join( 

repr(p_ep) for p_ep in six.itervalues(self._plugins))) 

 

def __str__(self): 

if not self._plugins: 

return "No plugins" 

return "\n\n".join(str(p_ep) for p_ep in six.itervalues(self._plugins))