warnings.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553
  1. """Python part of the warnings subsystem."""
  2. import sys
  3. __all__ = ["warn", "warn_explicit", "showwarning",
  4. "formatwarning", "filterwarnings", "simplefilter",
  5. "resetwarnings", "catch_warnings"]
  6. def showwarning(message, category, filename, lineno, file=None, line=None):
  7. """Hook to write a warning to a file; replace if you like."""
  8. msg = WarningMessage(message, category, filename, lineno, file, line)
  9. _showwarnmsg_impl(msg)
  10. def formatwarning(message, category, filename, lineno, line=None):
  11. """Function to format a warning the standard way."""
  12. msg = WarningMessage(message, category, filename, lineno, None, line)
  13. return _formatwarnmsg_impl(msg)
  14. def _showwarnmsg_impl(msg):
  15. file = msg.file
  16. if file is None:
  17. file = sys.stderr
  18. if file is None:
  19. # sys.stderr is None when run with pythonw.exe:
  20. # warnings get lost
  21. return
  22. text = _formatwarnmsg(msg)
  23. try:
  24. file.write(text)
  25. except OSError:
  26. # the file (probably stderr) is invalid - this warning gets lost.
  27. pass
  28. def _formatwarnmsg_impl(msg):
  29. category = msg.category.__name__
  30. s = f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n"
  31. if msg.line is None:
  32. try:
  33. import linecache
  34. line = linecache.getline(msg.filename, msg.lineno)
  35. except Exception:
  36. # When a warning is logged during Python shutdown, linecache
  37. # and the import machinery don't work anymore
  38. line = None
  39. linecache = None
  40. else:
  41. line = msg.line
  42. if line:
  43. line = line.strip()
  44. s += " %s\n" % line
  45. if msg.source is not None:
  46. try:
  47. import tracemalloc
  48. # Logging a warning should not raise a new exception:
  49. # catch Exception, not only ImportError and RecursionError.
  50. except Exception:
  51. # don't suggest to enable tracemalloc if it's not available
  52. tracing = True
  53. tb = None
  54. else:
  55. tracing = tracemalloc.is_tracing()
  56. try:
  57. tb = tracemalloc.get_object_traceback(msg.source)
  58. except Exception:
  59. # When a warning is logged during Python shutdown, tracemalloc
  60. # and the import machinery don't work anymore
  61. tb = None
  62. if tb is not None:
  63. s += 'Object allocated at (most recent call last):\n'
  64. for frame in tb:
  65. s += (' File "%s", lineno %s\n'
  66. % (frame.filename, frame.lineno))
  67. try:
  68. if linecache is not None:
  69. line = linecache.getline(frame.filename, frame.lineno)
  70. else:
  71. line = None
  72. except Exception:
  73. line = None
  74. if line:
  75. line = line.strip()
  76. s += ' %s\n' % line
  77. elif not tracing:
  78. s += (f'{category}: Enable tracemalloc to get the object '
  79. f'allocation traceback\n')
  80. return s
  81. # Keep a reference to check if the function was replaced
  82. _showwarning_orig = showwarning
  83. def _showwarnmsg(msg):
  84. """Hook to write a warning to a file; replace if you like."""
  85. try:
  86. sw = showwarning
  87. except NameError:
  88. pass
  89. else:
  90. if sw is not _showwarning_orig:
  91. # warnings.showwarning() was replaced
  92. if not callable(sw):
  93. raise TypeError("warnings.showwarning() must be set to a "
  94. "function or method")
  95. sw(msg.message, msg.category, msg.filename, msg.lineno,
  96. msg.file, msg.line)
  97. return
  98. _showwarnmsg_impl(msg)
  99. # Keep a reference to check if the function was replaced
  100. _formatwarning_orig = formatwarning
  101. def _formatwarnmsg(msg):
  102. """Function to format a warning the standard way."""
  103. try:
  104. fw = formatwarning
  105. except NameError:
  106. pass
  107. else:
  108. if fw is not _formatwarning_orig:
  109. # warnings.formatwarning() was replaced
  110. return fw(msg.message, msg.category,
  111. msg.filename, msg.lineno, msg.line)
  112. return _formatwarnmsg_impl(msg)
  113. def filterwarnings(action, message="", category=Warning, module="", lineno=0,
  114. append=False):
  115. """Insert an entry into the list of warnings filters (at the front).
  116. 'action' -- one of "error", "ignore", "always", "default", "module",
  117. or "once"
  118. 'message' -- a regex that the warning message must match
  119. 'category' -- a class that the warning must be a subclass of
  120. 'module' -- a regex that the module name must match
  121. 'lineno' -- an integer line number, 0 matches all warnings
  122. 'append' -- if true, append to the list of filters
  123. """
  124. assert action in ("error", "ignore", "always", "default", "module",
  125. "once"), "invalid action: %r" % (action,)
  126. assert isinstance(message, str), "message must be a string"
  127. assert isinstance(category, type), "category must be a class"
  128. assert issubclass(category, Warning), "category must be a Warning subclass"
  129. assert isinstance(module, str), "module must be a string"
  130. assert isinstance(lineno, int) and lineno >= 0, \
  131. "lineno must be an int >= 0"
  132. if message or module:
  133. import re
  134. if message:
  135. message = re.compile(message, re.I)
  136. else:
  137. message = None
  138. if module:
  139. module = re.compile(module)
  140. else:
  141. module = None
  142. _add_filter(action, message, category, module, lineno, append=append)
  143. def simplefilter(action, category=Warning, lineno=0, append=False):
  144. """Insert a simple entry into the list of warnings filters (at the front).
  145. A simple filter matches all modules and messages.
  146. 'action' -- one of "error", "ignore", "always", "default", "module",
  147. or "once"
  148. 'category' -- a class that the warning must be a subclass of
  149. 'lineno' -- an integer line number, 0 matches all warnings
  150. 'append' -- if true, append to the list of filters
  151. """
  152. assert action in ("error", "ignore", "always", "default", "module",
  153. "once"), "invalid action: %r" % (action,)
  154. assert isinstance(lineno, int) and lineno >= 0, \
  155. "lineno must be an int >= 0"
  156. _add_filter(action, None, category, None, lineno, append=append)
  157. def _add_filter(*item, append):
  158. # Remove possible duplicate filters, so new one will be placed
  159. # in correct place. If append=True and duplicate exists, do nothing.
  160. if not append:
  161. try:
  162. filters.remove(item)
  163. except ValueError:
  164. pass
  165. filters.insert(0, item)
  166. else:
  167. if item not in filters:
  168. filters.append(item)
  169. _filters_mutated()
  170. def resetwarnings():
  171. """Clear the list of warning filters, so that no filters are active."""
  172. filters[:] = []
  173. _filters_mutated()
  174. class _OptionError(Exception):
  175. """Exception used by option processing helpers."""
  176. pass
  177. # Helper to process -W options passed via sys.warnoptions
  178. def _processoptions(args):
  179. for arg in args:
  180. try:
  181. _setoption(arg)
  182. except _OptionError as msg:
  183. print("Invalid -W option ignored:", msg, file=sys.stderr)
  184. # Helper for _processoptions()
  185. def _setoption(arg):
  186. import re
  187. parts = arg.split(':')
  188. if len(parts) > 5:
  189. raise _OptionError("too many fields (max 5): %r" % (arg,))
  190. while len(parts) < 5:
  191. parts.append('')
  192. action, message, category, module, lineno = [s.strip()
  193. for s in parts]
  194. action = _getaction(action)
  195. message = re.escape(message)
  196. category = _getcategory(category)
  197. module = re.escape(module)
  198. if module:
  199. module = module + '$'
  200. if lineno:
  201. try:
  202. lineno = int(lineno)
  203. if lineno < 0:
  204. raise ValueError
  205. except (ValueError, OverflowError):
  206. raise _OptionError("invalid lineno %r" % (lineno,)) from None
  207. else:
  208. lineno = 0
  209. filterwarnings(action, message, category, module, lineno)
  210. # Helper for _setoption()
  211. def _getaction(action):
  212. if not action:
  213. return "default"
  214. if action == "all": return "always" # Alias
  215. for a in ('default', 'always', 'ignore', 'module', 'once', 'error'):
  216. if a.startswith(action):
  217. return a
  218. raise _OptionError("invalid action: %r" % (action,))
  219. # Helper for _setoption()
  220. def _getcategory(category):
  221. import re
  222. if not category:
  223. return Warning
  224. if re.match("^[a-zA-Z0-9_]+$", category):
  225. try:
  226. cat = eval(category)
  227. except NameError:
  228. raise _OptionError("unknown warning category: %r" % (category,)) from None
  229. else:
  230. i = category.rfind(".")
  231. module = category[:i]
  232. klass = category[i+1:]
  233. try:
  234. m = __import__(module, None, None, [klass])
  235. except ImportError:
  236. raise _OptionError("invalid module name: %r" % (module,)) from None
  237. try:
  238. cat = getattr(m, klass)
  239. except AttributeError:
  240. raise _OptionError("unknown warning category: %r" % (category,)) from None
  241. if not issubclass(cat, Warning):
  242. raise _OptionError("invalid warning category: %r" % (category,))
  243. return cat
  244. def _is_internal_frame(frame):
  245. """Signal whether the frame is an internal CPython implementation detail."""
  246. filename = frame.f_code.co_filename
  247. return 'importlib' in filename and '_bootstrap' in filename
  248. def _next_external_frame(frame):
  249. """Find the next frame that doesn't involve CPython internals."""
  250. frame = frame.f_back
  251. while frame is not None and _is_internal_frame(frame):
  252. frame = frame.f_back
  253. return frame
  254. # Code typically replaced by _warnings
  255. def warn(message, category=None, stacklevel=1, source=None):
  256. """Issue a warning, or maybe ignore it or raise an exception."""
  257. # Check if message is already a Warning object
  258. if isinstance(message, Warning):
  259. category = message.__class__
  260. # Check category argument
  261. if category is None:
  262. category = UserWarning
  263. if not (isinstance(category, type) and issubclass(category, Warning)):
  264. raise TypeError("category must be a Warning subclass, "
  265. "not '{:s}'".format(type(category).__name__))
  266. # Get context information
  267. try:
  268. if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)):
  269. # If frame is too small to care or if the warning originated in
  270. # internal code, then do not try to hide any frames.
  271. frame = sys._getframe(stacklevel)
  272. else:
  273. frame = sys._getframe(1)
  274. # Look for one frame less since the above line starts us off.
  275. for x in range(stacklevel-1):
  276. frame = _next_external_frame(frame)
  277. if frame is None:
  278. raise ValueError
  279. except ValueError:
  280. globals = sys.__dict__
  281. filename = "sys"
  282. lineno = 1
  283. else:
  284. globals = frame.f_globals
  285. filename = frame.f_code.co_filename
  286. lineno = frame.f_lineno
  287. if '__name__' in globals:
  288. module = globals['__name__']
  289. else:
  290. module = "<string>"
  291. registry = globals.setdefault("__warningregistry__", {})
  292. warn_explicit(message, category, filename, lineno, module, registry,
  293. globals, source)
  294. def warn_explicit(message, category, filename, lineno,
  295. module=None, registry=None, module_globals=None,
  296. source=None):
  297. lineno = int(lineno)
  298. if module is None:
  299. module = filename or "<unknown>"
  300. if module[-3:].lower() == ".py":
  301. module = module[:-3] # XXX What about leading pathname?
  302. if registry is None:
  303. registry = {}
  304. if registry.get('version', 0) != _filters_version:
  305. registry.clear()
  306. registry['version'] = _filters_version
  307. if isinstance(message, Warning):
  308. text = str(message)
  309. category = message.__class__
  310. else:
  311. text = message
  312. message = category(message)
  313. key = (text, category, lineno)
  314. # Quick test for common case
  315. if registry.get(key):
  316. return
  317. # Search the filters
  318. for item in filters:
  319. action, msg, cat, mod, ln = item
  320. if ((msg is None or msg.match(text)) and
  321. issubclass(category, cat) and
  322. (mod is None or mod.match(module)) and
  323. (ln == 0 or lineno == ln)):
  324. break
  325. else:
  326. action = defaultaction
  327. # Early exit actions
  328. if action == "ignore":
  329. return
  330. # Prime the linecache for formatting, in case the
  331. # "file" is actually in a zipfile or something.
  332. import linecache
  333. linecache.getlines(filename, module_globals)
  334. if action == "error":
  335. raise message
  336. # Other actions
  337. if action == "once":
  338. registry[key] = 1
  339. oncekey = (text, category)
  340. if onceregistry.get(oncekey):
  341. return
  342. onceregistry[oncekey] = 1
  343. elif action == "always":
  344. pass
  345. elif action == "module":
  346. registry[key] = 1
  347. altkey = (text, category, 0)
  348. if registry.get(altkey):
  349. return
  350. registry[altkey] = 1
  351. elif action == "default":
  352. registry[key] = 1
  353. else:
  354. # Unrecognized actions are errors
  355. raise RuntimeError(
  356. "Unrecognized action (%r) in warnings.filters:\n %s" %
  357. (action, item))
  358. # Print message and context
  359. msg = WarningMessage(message, category, filename, lineno, source)
  360. _showwarnmsg(msg)
  361. class WarningMessage(object):
  362. _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
  363. "line", "source")
  364. def __init__(self, message, category, filename, lineno, file=None,
  365. line=None, source=None):
  366. self.message = message
  367. self.category = category
  368. self.filename = filename
  369. self.lineno = lineno
  370. self.file = file
  371. self.line = line
  372. self.source = source
  373. self._category_name = category.__name__ if category else None
  374. def __str__(self):
  375. return ("{message : %r, category : %r, filename : %r, lineno : %s, "
  376. "line : %r}" % (self.message, self._category_name,
  377. self.filename, self.lineno, self.line))
  378. class catch_warnings(object):
  379. """A context manager that copies and restores the warnings filter upon
  380. exiting the context.
  381. The 'record' argument specifies whether warnings should be captured by a
  382. custom implementation of warnings.showwarning() and be appended to a list
  383. returned by the context manager. Otherwise None is returned by the context
  384. manager. The objects appended to the list are arguments whose attributes
  385. mirror the arguments to showwarning().
  386. The 'module' argument is to specify an alternative module to the module
  387. named 'warnings' and imported under that name. This argument is only useful
  388. when testing the warnings module itself.
  389. """
  390. def __init__(self, *, record=False, module=None):
  391. """Specify whether to record warnings and if an alternative module
  392. should be used other than sys.modules['warnings'].
  393. For compatibility with Python 3.0, please consider all arguments to be
  394. keyword-only.
  395. """
  396. self._record = record
  397. self._module = sys.modules['warnings'] if module is None else module
  398. self._entered = False
  399. def __repr__(self):
  400. args = []
  401. if self._record:
  402. args.append("record=True")
  403. if self._module is not sys.modules['warnings']:
  404. args.append("module=%r" % self._module)
  405. name = type(self).__name__
  406. return "%s(%s)" % (name, ", ".join(args))
  407. def __enter__(self):
  408. if self._entered:
  409. raise RuntimeError("Cannot enter %r twice" % self)
  410. self._entered = True
  411. self._filters = self._module.filters
  412. self._module.filters = self._filters[:]
  413. self._module._filters_mutated()
  414. self._showwarning = self._module.showwarning
  415. self._showwarnmsg_impl = self._module._showwarnmsg_impl
  416. if self._record:
  417. log = []
  418. self._module._showwarnmsg_impl = log.append
  419. # Reset showwarning() to the default implementation to make sure
  420. # that _showwarnmsg() calls _showwarnmsg_impl()
  421. self._module.showwarning = self._module._showwarning_orig
  422. return log
  423. else:
  424. return None
  425. def __exit__(self, *exc_info):
  426. if not self._entered:
  427. raise RuntimeError("Cannot exit %r without entering first" % self)
  428. self._module.filters = self._filters
  429. self._module._filters_mutated()
  430. self._module.showwarning = self._showwarning
  431. self._module._showwarnmsg_impl = self._showwarnmsg_impl
  432. # Private utility function called by _PyErr_WarnUnawaitedCoroutine
  433. def _warn_unawaited_coroutine(coro):
  434. msg_lines = [
  435. f"coroutine '{coro.__qualname__}' was never awaited\n"
  436. ]
  437. if coro.cr_origin is not None:
  438. import linecache, traceback
  439. def extract():
  440. for filename, lineno, funcname in reversed(coro.cr_origin):
  441. line = linecache.getline(filename, lineno)
  442. yield (filename, lineno, funcname, line)
  443. msg_lines.append("Coroutine created at (most recent call last)\n")
  444. msg_lines += traceback.format_list(list(extract()))
  445. msg = "".join(msg_lines).rstrip("\n")
  446. # Passing source= here means that if the user happens to have tracemalloc
  447. # enabled and tracking where the coroutine was created, the warning will
  448. # contain that traceback. This does mean that if they have *both*
  449. # coroutine origin tracking *and* tracemalloc enabled, they'll get two
  450. # partially-redundant tracebacks. If we wanted to be clever we could
  451. # probably detect this case and avoid it, but for now we don't bother.
  452. warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
  453. # filters contains a sequence of filter 5-tuples
  454. # The components of the 5-tuple are:
  455. # - an action: error, ignore, always, default, module, or once
  456. # - a compiled regex that must match the warning message
  457. # - a class representing the warning category
  458. # - a compiled regex that must match the module that is being warned
  459. # - a line number for the line being warning, or 0 to mean any line
  460. # If either if the compiled regexs are None, match anything.
  461. try:
  462. from _warnings import (filters, _defaultaction, _onceregistry,
  463. warn, warn_explicit, _filters_mutated)
  464. defaultaction = _defaultaction
  465. onceregistry = _onceregistry
  466. _warnings_defaults = True
  467. except ImportError:
  468. filters = []
  469. defaultaction = "default"
  470. onceregistry = {}
  471. _filters_version = 1
  472. def _filters_mutated():
  473. global _filters_version
  474. _filters_version += 1
  475. _warnings_defaults = False
  476. # Module initialization
  477. _processoptions(sys.warnoptions)
  478. if not _warnings_defaults:
  479. # Several warning categories are ignored by default in regular builds
  480. if not hasattr(sys, 'gettotalrefcount'):
  481. filterwarnings("default", category=DeprecationWarning,
  482. module="__main__", append=1)
  483. simplefilter("ignore", category=DeprecationWarning, append=1)
  484. simplefilter("ignore", category=PendingDeprecationWarning, append=1)
  485. simplefilter("ignore", category=ImportWarning, append=1)
  486. simplefilter("ignore", category=ResourceWarning, append=1)
  487. del _warnings_defaults