Coverage for qutebrowser/commands/userscripts.py : 72%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# vim: ft=python fileencoding=utf-8 sts=4 sw=4 et:
# Copyright 2014-2018 Florian Bruhin (The Compiler) <mail@qutebrowser.org> # # This file is part of qutebrowser. # # qutebrowser is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # qutebrowser is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with qutebrowser. If not, see <http://www.gnu.org/licenses/>.
"""A FIFO reader based on a QSocketNotifier.
Attributes: _filepath: The path to the opened FIFO. _fifo: The Python file object for the FIFO. _notifier: The QSocketNotifier used.
Signals: got_line: Emitted when a whole line arrived. """
# We open as R/W so we never get EOF and have to reopen the pipe. # See http://www.outflux.net/blog/archives/2008/03/09/using-select-on-a-fifo/ # We also use os.open and os.fdopen rather than built-in open so we # can add O_NONBLOCK. # pylint: disable=no-member,useless-suppression # pylint: enable=no-member,useless-suppression
def read_line(self): """(Try to) read a line from the FIFO.""" .format(e)) except RuntimeError as e: # For unknown reasons, read_line can still get called after the # QSocketNotifier was already deleted... log.procs.debug("While reading userscript output: {}".format(e))
"""Clean up so the FIFO can be closed.""" self.got_line.emit(line.rstrip('\r\n'))
"""Common part between the Windows and the POSIX userscript runners.
Attributes: _filepath: The path of the file/FIFO which is being read. _proc: The GUIProcess which is being executed. _cleaned_up: Whether temporary files were cleaned up. _text_stored: Set when the page text was stored async. _html_stored: Set when the page html was stored async. _args: Arguments to pass to _run_process. _kwargs: Keyword arguments to pass to _run_process.
Signals: got_cmd: Emitted when a new command arrived and should be executed. finished: Emitted when the userscript finished running. """
"""Called as callback when the text is ready from the web backend.""" suffix='.txt', delete=False) as txt_file:
"""Called as callback when the html is ready from the web backend.""" suffix='.html', delete=False) as html_file:
"""Start the given command.
Args: cmd: The command to be started. *args: The arguments to hand to the command env: A dictionary of environment variables to add. verbose: Show notifications when the command started/exited. """ additional_env=self._env, verbose=verbose, parent=self)
"""Clean up temporary files.""" return # NOTE: Do not replace this with "raise CommandError" as it's # executed async. fn, e))
"""Prepare running the userscript given.
Needs to be overridden by subclasses. The script will actually run after store_text and store_html have been called.
Args: Passed to _run_process. """ raise NotImplementedError
def on_proc_finished(self): """Called when the process has finished.
Needs to be overridden by subclasses. """ raise NotImplementedError
def on_proc_error(self): """Called when the process encountered an error.
Needs to be overridden by subclasses. """ raise NotImplementedError
"""Userscript runner to be used on POSIX. Uses _QtFIFOReader.
Commands are executed immediately when they arrive in the FIFO.
Attributes: _reader: The _QtFIFOReader instance. """
# tempfile.mktemp is deprecated and discouraged, but we use it here # to create a FIFO since the only other alternative would be to # create a directory and place the FIFO there, which sucks. Since # os.mkfifo will raise an exception anyways when the path doesn't # exist, it shouldn't be a big issue. dir=standarddir.runtime()) # pylint: disable=no-member,useless-suppression # pylint: enable=no-member,useless-suppression except OSError as e: message.error("Error while creating FIFO: {}".format(e)) return
def on_proc_finished(self):
def on_proc_error(self):
"""Clean up reader and temporary files."""
"""Userscript runner to be used on Windows.
This is a much more dumb implementation compared to POSIXUserscriptRunner. It uses a normal flat file for commands and executes them all at once when the process has finished, as Windows doesn't really understand the concept of using files as named pipes.
This also means the userscript *has* to use >> (append) rather than > (overwrite) to write to the file! """
"""Clean up temporary files after the userscript finished."""
log.procs.exception("Failed to read command file!") .format(e))
def on_proc_error(self):
def on_proc_finished(self): """Read back the commands when the process finished."""
except OSError as e: message.error("Error while creating tempfile: {}".format(e)) return
"""Base class for userscript exceptions."""
"""Raised when spawning a userscript that doesn't exist.
Attributes: script_name: name of the userscript as called paths: path names that were searched for the userscript """
super().__init__() self.script_name = script_name self.paths = paths
msg = "Userscript '{}' not found".format(self.script_name) if self.paths: msg += " in userscript directories {}".format( ', '.join(repr(path) for path in self.paths)) return msg
"""Raised when userscripts aren't supported on this platform."""
"""Search userscript directories for given command.
Raises: NotFoundError if the command could not be found.
Args: cmd: The command to look for.
Returns: A path to the userscript. """ directories = [ os.path.join(standarddir.data(), "userscripts"), os.path.join(standarddir.data(system=True), "userscripts"), ] for directory in directories: cmd_path = os.path.join(directory, cmd) if os.path.exists(cmd_path): return cmd_path
raise NotFoundError(cmd, directories)
"""Run a userscript after dumping page html/source.
Raises: UnsupportedError if userscripts are not supported on the current platform. NotFoundError if the command could not be found.
Args: tab: The WebKitTab/WebEngineTab to get the source from. cmd: The userscript binary to run. *args: The arguments to pass to the userscript. win_id: The window id the userscript is executed in. env: A dictionary of variables to add to the process environment. verbose: Show notifications when the command started/exited. """ window=win_id)
runner = _POSIXUserscriptRunner(tabbed_browser) elif utils.is_windows: # pragma: no cover runner = _WindowsUserscriptRunner(tabbed_browser) else: # pragma: no cover raise UnsupportedError
runner.got_cmd.connect( lambda cmd: log.commands.debug("Got userscript command: {}".format(cmd))) runner.got_cmd.connect(commandrunner.run_safely) user_agent = config.val.content.headers.user_agent if user_agent is not None: env['QUTE_USER_AGENT'] = user_agent
env['QUTE_CONFIG_DIR'] = standarddir.config() env['QUTE_DATA_DIR'] = standarddir.data() env['QUTE_DOWNLOAD_DIR'] = downloads.download_dir() env['QUTE_COMMANDLINE_TEXT'] = objreg.get('status-command', scope='window', window=win_id).text()
cmd_path = os.path.expanduser(cmd)
# if cmd is not given as an absolute path, look it up # ~/.local/share/qutebrowser/userscripts (or $XDG_DATA_DIR) if not os.path.isabs(cmd_path): log.misc.debug("{} is no absolute path".format(cmd_path)) cmd_path = _lookup_path(cmd) elif not os.path.exists(cmd_path): raise NotFoundError(cmd_path) log.misc.debug("Userscript to run: {}".format(cmd_path))
runner.finished.connect(commandrunner.deleteLater) runner.finished.connect(runner.deleteLater)
runner.prepare_run(cmd_path, *args, env=env, verbose=verbose) tab.dump_async(runner.store_html) tab.dump_async(runner.store_text, plain=True) return runner |