From 7e18b88f80c37654a8001a8fffff5a4107af5175 Mon Sep 17 00:00:00 2001 From: Simon Chopin Date: Thu, 8 Oct 2015 09:26:23 -0700 Subject: [PATCH] Remove the pycompat* submodules Those are not needed in Debian as we already ship the latest runtime version. Forwarded: not-needed Last-Update: 2013-04-30 Patch-Name: remove_compat_layers --- kitchen2/docs/api-overview.rst | 3 - kitchen2/docs/api-pycompat24.rst | 34 - kitchen2/docs/api-pycompat25.rst | 8 - kitchen2/docs/api-pycompat27.rst | 35 - kitchen2/kitchen/text/converters.py | 3 - kitchen2/kitchen/text/misc.py | 3 - .../tests/subprocessdata/sigchild_ignore.py | 11 - kitchen2/tests/test__all__.py | 2 - kitchen2/tests/test_base64.py | 190 --- kitchen2/tests/test_collections.py | 3 - kitchen2/tests/test_defaultdict.py | 180 -- kitchen2/tests/test_pycompat.py | 25 - kitchen2/tests/test_pycompat24.py | 109 -- kitchen2/tests/test_subprocess.py | 1467 ----------------- kitchen3/docs/api-overview.rst | 3 - kitchen3/docs/api-pycompat24.rst | 34 - kitchen3/docs/api-pycompat25.rst | 8 - kitchen3/docs/api-pycompat27.rst | 35 - .../tests/subprocessdata/sigchild_ignore.py | 11 - kitchen3/tests/test__all__.py | 2 - kitchen3/tests/test_collections.py | 3 - kitchen3/tests/test_deprecation_py3.py | 65 - kitchen3/tests/test_pycompat.py | 25 - setup.py | 14 - 24 files changed, 2273 deletions(-) delete mode 100644 kitchen2/docs/api-pycompat24.rst delete mode 100644 kitchen2/docs/api-pycompat25.rst delete mode 100644 kitchen2/docs/api-pycompat27.rst delete mode 100644 kitchen2/tests/subprocessdata/sigchild_ignore.py delete mode 100644 kitchen2/tests/test_base64.py delete mode 100644 kitchen2/tests/test_defaultdict.py delete mode 100644 kitchen2/tests/test_pycompat.py delete mode 100644 kitchen2/tests/test_pycompat24.py delete mode 100644 kitchen2/tests/test_subprocess.py delete mode 100644 kitchen3/docs/api-pycompat24.rst delete mode 100644 kitchen3/docs/api-pycompat25.rst delete mode 100644 kitchen3/docs/api-pycompat27.rst delete mode 100644 kitchen3/tests/subprocessdata/sigchild_ignore.py delete mode 100644 kitchen3/tests/test_deprecation_py3.py delete mode 100644 kitchen3/tests/test_pycompat.py diff --git a/kitchen2/docs/api-overview.rst b/kitchen2/docs/api-overview.rst index dda56fe..e53a94d 100644 --- a/kitchen2/docs/api-overview.rst +++ b/kitchen2/docs/api-overview.rst @@ -16,9 +16,6 @@ that may drag in more dependencies can be found on the `project webpage`_ api-collections api-iterutils api-versioning - api-pycompat24 - api-pycompat25 - api-pycompat27 api-exceptions .. _`project webpage`: https://fedorahosted.org/kitchen diff --git a/kitchen2/docs/api-pycompat24.rst b/kitchen2/docs/api-pycompat24.rst deleted file mode 100644 index a3247b6..0000000 --- a/kitchen2/docs/api-pycompat24.rst +++ /dev/null @@ -1,34 +0,0 @@ -======================= -Python 2.4 Compatibiity -======================= - - -------------------- -Sets for python-2.3 -------------------- - -.. automodule:: kitchen.pycompat24.sets -.. autofunction:: kitchen.pycompat24.sets.add_builtin_set - ----------------------------------- -Partial new style base64 interface ----------------------------------- - -.. automodule:: kitchen.pycompat24.base64 - :members: - ----------- -Subprocess ----------- - -.. seealso:: - - :mod:`kitchen.pycompat27.subprocess` - Kitchen includes the python-2.7 version of subprocess which has a new - function, :func:`~kitchen.pycompat27.subprocess.check_output`. When - you import :mod:`pycompat24.subprocess` you will be getting the - python-2.7 version of subprocess rather than the 2.4 version (where - subprocess first appeared). This choice was made so that we can - concentrate our efforts on keeping the single version of subprocess up - to date rather than working on a 2.4 version that very few people - would need specifically. diff --git a/kitchen2/docs/api-pycompat25.rst b/kitchen2/docs/api-pycompat25.rst deleted file mode 100644 index 1841c6a..0000000 --- a/kitchen2/docs/api-pycompat25.rst +++ /dev/null @@ -1,8 +0,0 @@ -======================== -Python 2.5 Compatibility -======================== - -.. automodule:: kitchen.pycompat25 - -.. automodule:: kitchen.pycompat25.collections._defaultdict - diff --git a/kitchen2/docs/api-pycompat27.rst b/kitchen2/docs/api-pycompat27.rst deleted file mode 100644 index 9654b31..0000000 --- a/kitchen2/docs/api-pycompat27.rst +++ /dev/null @@ -1,35 +0,0 @@ -======================== -Python 2.7 Compatibility -======================== - -.. module:: kitchen.pycompat27.subprocess - --------------------------- -Subprocess from Python 2.7 --------------------------- - -The :mod:`subprocess` module included here is a direct import from -python-2.7's |stdlib|_. You can access it via:: - - >>> from kitchen.pycompat27 import subprocess - -The motivation for including this module is that various API changing -improvements have been made to subprocess over time. The following is a list -of the known changes to :mod:`subprocess` with the python version they were -introduced in: - -==================================== === -New API Feature Ver -==================================== === -:exc:`subprocess.CalledProcessError` 2.5 -:func:`subprocess.check_call` 2.5 -:func:`subprocess.check_output` 2.7 -:meth:`subprocess.Popen.send_signal` 2.6 -:meth:`subprocess.Popen.terminate` 2.6 -:meth:`subprocess.Popen.kill` 2.6 -==================================== === - -.. seealso:: - - The stdlib :mod:`subprocess` documentation - For complete documentation on how to use subprocess diff --git a/kitchen2/kitchen/text/converters.py b/kitchen2/kitchen/text/converters.py index e4bcb0d..eb518cb 100644 --- a/kitchen2/kitchen/text/converters.py +++ b/kitchen2/kitchen/text/converters.py @@ -50,9 +50,6 @@ import codecs import warnings import xml.sax.saxutils -from kitchen.pycompat24 import sets -sets.add_builtin_set() - from kitchen.text.exceptions import ControlCharError, XmlEncodeError from kitchen.text.misc import guess_encoding, html_entities_unescape, \ isbytestring, isunicodestring, process_control_chars diff --git a/kitchen2/kitchen/text/misc.py b/kitchen2/kitchen/text/misc.py index 333e363..babb68e 100644 --- a/kitchen2/kitchen/text/misc.py +++ b/kitchen2/kitchen/text/misc.py @@ -43,11 +43,8 @@ try: except ImportError: chardet = None -from kitchen.pycompat24 import sets from kitchen.text.exceptions import ControlCharError -sets.add_builtin_set() - # Define a threshold for chardet confidence. If we fall below this we decode # byte strings we're guessing about as latin1 _CHARDET_THRESHHOLD = 0.6 diff --git a/kitchen2/tests/subprocessdata/sigchild_ignore.py b/kitchen2/tests/subprocessdata/sigchild_ignore.py deleted file mode 100644 index 5b6dd08..0000000 --- a/kitchen2/tests/subprocessdata/sigchild_ignore.py +++ /dev/null @@ -1,11 +0,0 @@ -import os -import signal, sys -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..')) - -from kitchen.pycompat27.subprocess import _subprocess as subprocess - -# On Linux this causes os.waitpid to fail with OSError as the OS has already -# reaped our child process. The wait() passing the OSError on to the caller -# and causing us to exit with an error is what we are testing against. -signal.signal(signal.SIGCHLD, signal.SIG_IGN) -subprocess.Popen([sys.executable, '-c', 'print("albatross")']).wait() diff --git a/kitchen2/tests/test__all__.py b/kitchen2/tests/test__all__.py index dda287a..e08d699 100644 --- a/kitchen2/tests/test__all__.py +++ b/kitchen2/tests/test__all__.py @@ -4,8 +4,6 @@ from nose import tools import os import types import warnings -from kitchen.pycompat24.sets import add_builtin_set -add_builtin_set() def logit(msg): log = open('/var/tmp/test.log', 'a') diff --git a/kitchen2/tests/test_base64.py b/kitchen2/tests/test_base64.py deleted file mode 100644 index 49e1cd1..0000000 --- a/kitchen2/tests/test_base64.py +++ /dev/null @@ -1,190 +0,0 @@ -import unittest -from kitchen.pycompat24.base64 import _base64 as base64 - - - -class LegacyBase64TestCase(unittest.TestCase): - def test_encodestring(self): - eq = self.assertEqual - eq(base64.encodestring("www.python.org"), "d3d3LnB5dGhvbi5vcmc=\n") - eq(base64.encodestring("a"), "YQ==\n") - eq(base64.encodestring("ab"), "YWI=\n") - eq(base64.encodestring("abc"), "YWJj\n") - eq(base64.encodestring(""), "") - eq(base64.encodestring("abcdefghijklmnopqrstuvwxyz" - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "0123456789!@#0^&*();:<>,. []{}"), - "YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE" - "RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0\nNT" - "Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ==\n") - - def test_decodestring(self): - eq = self.assertEqual - eq(base64.decodestring("d3d3LnB5dGhvbi5vcmc=\n"), "www.python.org") - eq(base64.decodestring("YQ==\n"), "a") - eq(base64.decodestring("YWI=\n"), "ab") - eq(base64.decodestring("YWJj\n"), "abc") - eq(base64.decodestring("YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE" - "RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0\nNT" - "Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ==\n"), - "abcdefghijklmnopqrstuvwxyz" - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "0123456789!@#0^&*();:<>,. []{}") - eq(base64.decodestring(''), '') - - def test_encode(self): - eq = self.assertEqual - from cStringIO import StringIO - infp = StringIO('abcdefghijklmnopqrstuvwxyz' - 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' - '0123456789!@#0^&*();:<>,. []{}') - outfp = StringIO() - base64.encode(infp, outfp) - eq(outfp.getvalue(), - 'YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE' - 'RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0\nNT' - 'Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ==\n') - - def test_decode(self): - from cStringIO import StringIO - infp = StringIO('d3d3LnB5dGhvbi5vcmc=') - outfp = StringIO() - base64.decode(infp, outfp) - self.assertEqual(outfp.getvalue(), 'www.python.org') - - - -class BaseXYTestCase(unittest.TestCase): - def test_b64encode(self): - eq = self.assertEqual - # Test default alphabet - eq(base64.b64encode("www.python.org"), "d3d3LnB5dGhvbi5vcmc=") - eq(base64.b64encode('\x00'), 'AA==') - eq(base64.b64encode("a"), "YQ==") - eq(base64.b64encode("ab"), "YWI=") - eq(base64.b64encode("abc"), "YWJj") - eq(base64.b64encode(""), "") - eq(base64.b64encode("abcdefghijklmnopqrstuvwxyz" - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "0123456789!@#0^&*();:<>,. []{}"), - "YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE" - "RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0NT" - "Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ==") - # Test with arbitrary alternative characters - eq(base64.b64encode('\xd3V\xbeo\xf7\x1d', altchars='*$'), '01a*b$cd') - # Test standard alphabet - eq(base64.standard_b64encode("www.python.org"), "d3d3LnB5dGhvbi5vcmc=") - eq(base64.standard_b64encode("a"), "YQ==") - eq(base64.standard_b64encode("ab"), "YWI=") - eq(base64.standard_b64encode("abc"), "YWJj") - eq(base64.standard_b64encode(""), "") - eq(base64.standard_b64encode("abcdefghijklmnopqrstuvwxyz" - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "0123456789!@#0^&*();:<>,. []{}"), - "YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE" - "RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0NT" - "Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ==") - # Test with 'URL safe' alternative characters - eq(base64.urlsafe_b64encode('\xd3V\xbeo\xf7\x1d'), '01a-b_cd') - - def test_b64decode(self): - eq = self.assertEqual - eq(base64.b64decode("d3d3LnB5dGhvbi5vcmc="), "www.python.org") - eq(base64.b64decode('AA=='), '\x00') - eq(base64.b64decode("YQ=="), "a") - eq(base64.b64decode("YWI="), "ab") - eq(base64.b64decode("YWJj"), "abc") - eq(base64.b64decode("YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE" - "RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0\nNT" - "Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ=="), - "abcdefghijklmnopqrstuvwxyz" - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "0123456789!@#0^&*();:<>,. []{}") - eq(base64.b64decode(''), '') - # Test with arbitrary alternative characters - eq(base64.b64decode('01a*b$cd', altchars='*$'), '\xd3V\xbeo\xf7\x1d') - # Test standard alphabet - eq(base64.standard_b64decode("d3d3LnB5dGhvbi5vcmc="), "www.python.org") - eq(base64.standard_b64decode("YQ=="), "a") - eq(base64.standard_b64decode("YWI="), "ab") - eq(base64.standard_b64decode("YWJj"), "abc") - eq(base64.standard_b64decode(""), "") - eq(base64.standard_b64decode("YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE" - "RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0NT" - "Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ=="), - "abcdefghijklmnopqrstuvwxyz" - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "0123456789!@#0^&*();:<>,. []{}") - # Test with 'URL safe' alternative characters - eq(base64.urlsafe_b64decode('01a-b_cd'), '\xd3V\xbeo\xf7\x1d') - - def test_b64decode_error(self): - self.assertRaises(TypeError, base64.b64decode, 'abc') - - def test_b32encode(self): - eq = self.assertEqual - eq(base64.b32encode(''), '') - eq(base64.b32encode('\x00'), 'AA======') - eq(base64.b32encode('a'), 'ME======') - eq(base64.b32encode('ab'), 'MFRA====') - eq(base64.b32encode('abc'), 'MFRGG===') - eq(base64.b32encode('abcd'), 'MFRGGZA=') - eq(base64.b32encode('abcde'), 'MFRGGZDF') - - def test_b32decode(self): - eq = self.assertEqual - eq(base64.b32decode(''), '') - eq(base64.b32decode('AA======'), '\x00') - eq(base64.b32decode('ME======'), 'a') - eq(base64.b32decode('MFRA===='), 'ab') - eq(base64.b32decode('MFRGG==='), 'abc') - eq(base64.b32decode('MFRGGZA='), 'abcd') - eq(base64.b32decode('MFRGGZDF'), 'abcde') - - def test_b32decode_casefold(self): - eq = self.assertEqual - eq(base64.b32decode('', True), '') - eq(base64.b32decode('ME======', True), 'a') - eq(base64.b32decode('MFRA====', True), 'ab') - eq(base64.b32decode('MFRGG===', True), 'abc') - eq(base64.b32decode('MFRGGZA=', True), 'abcd') - eq(base64.b32decode('MFRGGZDF', True), 'abcde') - # Lower cases - eq(base64.b32decode('me======', True), 'a') - eq(base64.b32decode('mfra====', True), 'ab') - eq(base64.b32decode('mfrgg===', True), 'abc') - eq(base64.b32decode('mfrggza=', True), 'abcd') - eq(base64.b32decode('mfrggzdf', True), 'abcde') - # Expected exceptions - self.assertRaises(TypeError, base64.b32decode, 'me======') - # Mapping zero and one - eq(base64.b32decode('MLO23456'), 'b\xdd\xad\xf3\xbe') - eq(base64.b32decode('M1023456', map01='L'), 'b\xdd\xad\xf3\xbe') - eq(base64.b32decode('M1023456', map01='I'), 'b\x1d\xad\xf3\xbe') - - def test_b32decode_error(self): - self.assertRaises(TypeError, base64.b32decode, 'abc') - self.assertRaises(TypeError, base64.b32decode, 'ABCDEF==') - - def test_b16encode(self): - eq = self.assertEqual - eq(base64.b16encode('\x01\x02\xab\xcd\xef'), '0102ABCDEF') - eq(base64.b16encode('\x00'), '00') - - def test_b16decode(self): - eq = self.assertEqual - eq(base64.b16decode('0102ABCDEF'), '\x01\x02\xab\xcd\xef') - eq(base64.b16decode('00'), '\x00') - # Lower case is not allowed without a flag - self.assertRaises(TypeError, base64.b16decode, '0102abcdef') - # Case fold - eq(base64.b16decode('0102abcdef', True), '\x01\x02\xab\xcd\xef') - - - -#from test import test_support -#def test_main(): -# test_support.run_unittest(__name__) -# -#if __name__ == '__main__': -# test_main() diff --git a/kitchen2/tests/test_collections.py b/kitchen2/tests/test_collections.py index 5e4dbd4..8765594 100644 --- a/kitchen2/tests/test_collections.py +++ b/kitchen2/tests/test_collections.py @@ -3,9 +3,6 @@ import unittest from nose import tools -from kitchen.pycompat24.sets import add_builtin_set -add_builtin_set() - from kitchen import collections def test_strict_dict_get_set(): diff --git a/kitchen2/tests/test_defaultdict.py b/kitchen2/tests/test_defaultdict.py deleted file mode 100644 index 5848d28..0000000 --- a/kitchen2/tests/test_defaultdict.py +++ /dev/null @@ -1,180 +0,0 @@ -"""Unit tests for collections.defaultdict.""" - -import os -import copy -import tempfile -import unittest - -from kitchen.pycompat25.collections._defaultdict import defaultdict - -def foobar(): - return list - -class TestDefaultDict(unittest.TestCase): - - def test_basic(self): - d1 = defaultdict() - self.assertEqual(d1.default_factory, None) - d1.default_factory = list - d1[12].append(42) - self.assertEqual(d1, {12: [42]}) - d1[12].append(24) - self.assertEqual(d1, {12: [42, 24]}) - d1[13] - d1[14] - self.assertEqual(d1, {12: [42, 24], 13: [], 14: []}) - self.assert_(d1[12] is not d1[13] is not d1[14]) - d2 = defaultdict(list, foo=1, bar=2) - self.assertEqual(d2.default_factory, list) - self.assertEqual(d2, {"foo": 1, "bar": 2}) - self.assertEqual(d2["foo"], 1) - self.assertEqual(d2["bar"], 2) - self.assertEqual(d2[42], []) - self.assert_("foo" in d2) - self.assert_("foo" in d2.keys()) - self.assert_("bar" in d2) - self.assert_("bar" in d2.keys()) - self.assert_(42 in d2) - self.assert_(42 in d2.keys()) - self.assert_(12 not in d2) - self.assert_(12 not in d2.keys()) - d2.default_factory = None - self.assertEqual(d2.default_factory, None) - try: - d2[15] - except KeyError, err: - self.assertEqual(err.args, (15,)) - else: - self.fail("d2[15] didn't raise KeyError") - self.assertRaises(TypeError, defaultdict, 1) - - def test_missing(self): - d1 = defaultdict() - self.assertRaises(KeyError, d1.__missing__, 42) - d1.default_factory = list - self.assertEqual(d1.__missing__(42), []) - - def test_repr(self): - d1 = defaultdict() - self.assertEqual(d1.default_factory, None) - self.assertEqual(repr(d1), "defaultdict(None, {})") - self.assertEqual(eval(repr(d1)), d1) - d1[11] = 41 - self.assertEqual(repr(d1), "defaultdict(None, {11: 41})") - d2 = defaultdict(int) - self.assertEqual(d2.default_factory, int) - d2[12] = 42 - self.assertEqual(repr(d2), "defaultdict(, {12: 42})") - def foo(): return 43 - d3 = defaultdict(foo) - - self.assert_(d3.default_factory is foo) - d3[13] - self.assertEqual(repr(d3), "defaultdict(%s, {13: 43})" % repr(foo)) - - def test_print(self): - d1 = defaultdict() - def foo(): return 42 - d2 = defaultdict(foo, {1: 2}) - # NOTE: We can't use tempfile.[Named]TemporaryFile since this - # code must exercise the tp_print C code, which only gets - # invoked for *real* files. - tfn = tempfile.mktemp() - try: - f = open(tfn, "w+") - try: - print >>f, d1 - print >>f, d2 - f.seek(0) - self.assertEqual(f.readline(), repr(d1) + "\n") - self.assertEqual(f.readline(), repr(d2) + "\n") - finally: - f.close() - finally: - os.remove(tfn) - - def test_copy(self): - d1 = defaultdict() - d2 = d1.copy() - self.assertEqual(type(d2), defaultdict) - self.assertEqual(d2.default_factory, None) - self.assertEqual(d2, {}) - d1.default_factory = list - d3 = d1.copy() - self.assertEqual(type(d3), defaultdict) - self.assertEqual(d3.default_factory, list) - self.assertEqual(d3, {}) - d1[42] - d4 = d1.copy() - self.assertEqual(type(d4), defaultdict) - self.assertEqual(d4.default_factory, list) - self.assertEqual(d4, {42: []}) - d4[12] - self.assertEqual(d4, {42: [], 12: []}) - - # Issue 6637: Copy fails for empty default dict - d = defaultdict() - d['a'] = 42 - e = d.copy() - self.assertEqual(e['a'], 42) - - def test_shallow_copy(self): - d1 = defaultdict(foobar, {1: 1}) - d2 = copy.copy(d1) - self.assertEqual(d2.default_factory, foobar) - self.assertEqual(d2, d1) - d1.default_factory = list - d2 = copy.copy(d1) - self.assertEqual(d2.default_factory, list) - self.assertEqual(d2, d1) - - def test_deep_copy(self): - d1 = defaultdict(foobar, {1: [1]}) - d2 = copy.deepcopy(d1) - self.assertEqual(d2.default_factory, foobar) - self.assertEqual(d2, d1) - self.assert_(d1[1] is not d2[1]) - d1.default_factory = list - d2 = copy.deepcopy(d1) - self.assertEqual(d2.default_factory, list) - self.assertEqual(d2, d1) - - def test_keyerror_without_factory(self): - d1 = defaultdict() - try: - d1[(1,)] - except KeyError, err: - self.assertEqual(err.args[0], (1,)) - else: - self.fail("expected KeyError") - - def test_recursive_repr(self): - # Issue2045: stack overflow when default_factory is a bound method - class sub(defaultdict): - def __init__(self): - self.default_factory = self._factory - def _factory(self): - return [] - d = sub() - self.assert_(repr(d).startswith( - "defaultdict(>f, d - finally: - f.close() - finally: - os.remove(tfn) - - -#from test import test_support -#def test_main(): -# test_support.run_unittest(TestDefaultDict) -# -#if __name__ == "__main__": -# test_main() diff --git a/kitchen2/tests/test_pycompat.py b/kitchen2/tests/test_pycompat.py deleted file mode 100644 index 50a059b..0000000 --- a/kitchen2/tests/test_pycompat.py +++ /dev/null @@ -1,25 +0,0 @@ -# -*- coding: utf-8 -*- -# -import unittest -from nose import tools - -class TestUsableModules(unittest.TestCase): - def test_subprocess(self): - '''Test that importing subprocess as a module works - ''' - try: - from kitchen.pycompat24.subprocess import Popen - except ImportError: - tools.ok_(False, 'Unable to import pycompat24.subprocess as a module') - try: - from kitchen.pycompat27.subprocess import Popen - except ImportError: - tools.ok_(False, 'Unable to import pycompat27.subprocess as a module') - - def test_base64(self): - '''Test that importing base64 as a module works - ''' - try: - from kitchen.pycompat24.base64 import b64encode - except ImportError: - tools.ok_(False, 'Unable to import pycompat24.base64 as a module') diff --git a/kitchen2/tests/test_pycompat24.py b/kitchen2/tests/test_pycompat24.py deleted file mode 100644 index adea7fe..0000000 --- a/kitchen2/tests/test_pycompat24.py +++ /dev/null @@ -1,109 +0,0 @@ -# -*- coding: utf-8 -*- -# -import unittest -from nose import tools -from nose.plugins.skip import SkipTest - -import __builtin__ -import base64 as py_b64 -import warnings - -from kitchen.pycompat24 import sets -from kitchen.pycompat24.base64 import _base64 as base64 - -class TestSetsNoOverwrite(unittest.TestCase): - def setUp(self): - self.set_val = None - self.frozenset_val = None - if not hasattr(__builtin__, 'set'): - __builtin__.set = self.set_val - else: - self.set_val = __builtin__.set - if not hasattr(__builtin__, 'frozenset'): - __builtin__.frozenset = self.frozenset_val - else: - self.frozenset_val = __builtin__.frozenset - - def tearDown(self): - if self.frozenset_val == None: - del(__builtin__.frozenset) - if self.set_val == None: - del(__builtin__.set) - - def test_sets_dont_overwrite(self): - '''Test that importing sets when there's already a set and frozenset defined does not overwrite - ''' - sets.add_builtin_set() - tools.ok_(__builtin__.set == self.set_val) - tools.ok_(__builtin__.frozenset == self.frozenset_val) - -class TestDefineSets(unittest.TestCase): - def setUp(self): - warnings.simplefilter('ignore', DeprecationWarning) - self.set_val = None - self.frozenset_val = None - if hasattr(__builtin__, 'set'): - self.set_val = __builtin__.set - del(__builtin__.set) - if hasattr(__builtin__, 'frozenset'): - self.frozenset_val = __builtin__.frozenset - del(__builtin__.frozenset) - - def tearDown(self): - warnings.simplefilter('default', DeprecationWarning) - if self.set_val: - __builtin__.set = self.set_val - else: - del(__builtin__.set) - if self.frozenset_val: - __builtin__.frozenset = self.frozenset_val - else: - del(__builtin__.frozenset) - - def test_pycompat_defines_set(self): - '''Test that calling pycompat24.add_builtin_set() adds set and frozenset to __builtin__ - ''' - import sets as py_sets - sets.add_builtin_set() - if self.set_val: - tools.ok_(__builtin__.set == self.set_val) - tools.ok_(__builtin__.frozenset == self.frozenset_val) - else: - tools.ok_(__builtin__.set == py_sets.Set) - tools.ok_(__builtin__.frozenset == py_sets.ImmutableSet) - -class TestSubprocess(unittest.TestCase): - pass - -class TestBase64(unittest.TestCase): - b_byte_chars = ' '.join(map(chr, range(0, 256))) - b_byte_encoded = 'ACABIAIgAyAEIAUgBiAHIAggCSAKIAsgDCANIA4gDyAQIBEgEiATIBQgFSAWIBcgGCAZIBogGyAcIB0gHiAfICAgISAiICMgJCAlICYgJyAoICkgKiArICwgLSAuIC8gMCAxIDIgMyA0IDUgNiA3IDggOSA6IDsgPCA9ID4gPyBAIEEgQiBDIEQgRSBGIEcgSCBJIEogSyBMIE0gTiBPIFAgUSBSIFMgVCBVIFYgVyBYIFkgWiBbIFwgXSBeIF8gYCBhIGIgYyBkIGUgZiBnIGggaSBqIGsgbCBtIG4gbyBwIHEgciBzIHQgdSB2IHcgeCB5IHogeyB8IH0gfiB/IIAggSCCIIMghCCFIIYghyCIIIkgiiCLIIwgjSCOII8gkCCRIJIgkyCUIJUgliCXIJggmSCaIJsgnCCdIJ4gnyCgIKEgoiCjIKQgpSCmIKcgqCCpIKogqyCsIK0griCvILAgsSCyILMgtCC1ILYgtyC4ILkguiC7ILwgvSC+IL8gwCDBIMIgwyDEIMUgxiDHIMggySDKIMsgzCDNIM4gzyDQINEg0iDTINQg1SDWINcg2CDZINog2yDcIN0g3iDfIOAg4SDiIOMg5CDlIOYg5yDoIOkg6iDrIOwg7SDuIO8g8CDxIPIg8yD0IPUg9iD3IPgg+SD6IPsg/CD9IP4g/w==' - b_byte_encoded_urlsafe = 'ACABIAIgAyAEIAUgBiAHIAggCSAKIAsgDCANIA4gDyAQIBEgEiATIBQgFSAWIBcgGCAZIBogGyAcIB0gHiAfICAgISAiICMgJCAlICYgJyAoICkgKiArICwgLSAuIC8gMCAxIDIgMyA0IDUgNiA3IDggOSA6IDsgPCA9ID4gPyBAIEEgQiBDIEQgRSBGIEcgSCBJIEogSyBMIE0gTiBPIFAgUSBSIFMgVCBVIFYgVyBYIFkgWiBbIFwgXSBeIF8gYCBhIGIgYyBkIGUgZiBnIGggaSBqIGsgbCBtIG4gbyBwIHEgciBzIHQgdSB2IHcgeCB5IHogeyB8IH0gfiB_IIAggSCCIIMghCCFIIYghyCIIIkgiiCLIIwgjSCOII8gkCCRIJIgkyCUIJUgliCXIJggmSCaIJsgnCCdIJ4gnyCgIKEgoiCjIKQgpSCmIKcgqCCpIKogqyCsIK0griCvILAgsSCyILMgtCC1ILYgtyC4ILkguiC7ILwgvSC-IL8gwCDBIMIgwyDEIMUgxiDHIMggySDKIMsgzCDNIM4gzyDQINEg0iDTINQg1SDWINcg2CDZINog2yDcIN0g3iDfIOAg4SDiIOMg5CDlIOYg5yDoIOkg6iDrIOwg7SDuIO8g8CDxIPIg8yD0IPUg9iD3IPgg-SD6IPsg_CD9IP4g_w==' - - def test_base64_encode(self): - tools.ok_(base64.b64encode(self.b_byte_chars) == self.b_byte_encoded) - tools.ok_(base64.b64encode(self.b_byte_chars, altchars='-_') == self.b_byte_encoded_urlsafe) - tools.ok_(base64.standard_b64encode(self.b_byte_chars) == self.b_byte_encoded) - tools.ok_(base64.urlsafe_b64encode(self.b_byte_chars) == self.b_byte_encoded_urlsafe) - - tools.ok_(base64.b64encode(self.b_byte_chars) == self.b_byte_encoded) - tools.ok_(base64.b64encode(self.b_byte_chars, altchars='-_') == self.b_byte_encoded_urlsafe) - tools.ok_(base64.standard_b64encode(self.b_byte_chars) == self.b_byte_encoded) - tools.ok_(base64.urlsafe_b64encode(self.b_byte_chars) == self.b_byte_encoded_urlsafe) - - def test_base64_decode(self): - tools.ok_(base64.b64decode(self.b_byte_encoded) == self.b_byte_chars) - tools.ok_(base64.b64decode(self.b_byte_encoded_urlsafe, altchars='-_') == self.b_byte_chars) - tools.ok_(base64.standard_b64decode(self.b_byte_encoded) == self.b_byte_chars) - tools.ok_(base64.urlsafe_b64decode(self.b_byte_encoded_urlsafe) == self.b_byte_chars) - - tools.ok_(base64.b64decode(self.b_byte_encoded) == self.b_byte_chars) - tools.ok_(base64.b64decode(self.b_byte_encoded_urlsafe, altchars='-_') == self.b_byte_chars) - tools.ok_(base64.standard_b64decode(self.b_byte_encoded) == self.b_byte_chars) - tools.ok_(base64.urlsafe_b64decode(self.b_byte_encoded_urlsafe) == self.b_byte_chars) - - def test_base64_stdlib_compat(self): - if not hasattr(py_b64, 'b64encode'): - raise SkipTest('Python-2.3 doesn\'t have b64encode to compare against') - tools.ok_(base64.b64encode(self.b_byte_chars) == py_b64.b64encode(self.b_byte_chars)) - tools.ok_(base64.b64decode(self.b_byte_chars) == py_b64.b64decode(self.b_byte_chars)) diff --git a/kitchen2/tests/test_subprocess.py b/kitchen2/tests/test_subprocess.py deleted file mode 100644 index 010f660..0000000 --- a/kitchen2/tests/test_subprocess.py +++ /dev/null @@ -1,1467 +0,0 @@ -import unittest -from nose.plugins.skip import SkipTest -from kitchen.pycompat27.subprocess import _subprocess as subprocess -import sys -import StringIO -import signal -import os -import errno -import tempfile -import time -import re -# Not available on python2.6 or less -#import sysconfig - -mswindows = (sys.platform == "win32") - -# -# Depends on the following external programs: Python -# - -if mswindows: - SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' - 'os.O_BINARY);') -else: - SETBINARY = '' - -def reap_children(): - """Use this function at the end of test_main() whenever sub-processes - are started. This will help ensure that no extra children (zombies) - stick around to hog resources and create problems when looking - for refleaks. - """ - - # Reap all our dead child processes so we don't leave zombies around. - # These hog resources and might be causing some of the buildbots to die. - if hasattr(os, 'waitpid'): - any_process = -1 - while True: - try: - # This will raise an exception on Windows. That's ok. - pid, status = os.waitpid(any_process, os.WNOHANG) - if pid == 0: - break - except: - break - -test_support = None -try: - from test import test_support - if not hasattr(test_support, 'reap_children'): - # No reap_children in python-2.3 - test_support.reap_children = reap_children -except ImportError: - pass - -# In a debug build, stuff like "[6580 refs]" is printed to stderr at -# shutdown time. That frustrates tests trying to check stderr produced -# from a spawned Python process. -def remove_stderr_debug_decorations(stderr): - return re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr) - -try: - mkstemp = tempfile.mkstemp -except AttributeError: - # tempfile.mkstemp is not available - def mkstemp(): - """Replacement for mkstemp, calling mktemp.""" - fname = tempfile.mktemp() - return os.open(fname, os.O_RDWR|os.O_CREAT), fname - - -class BaseTestCase(unittest.TestCase): - def __init__(self, *args, **kwargs): - unittest.TestCase.__init__(self, *args, **kwargs) - if not hasattr(self, '_cleanups'): - self._cleanups = [] - if not hasattr(self, 'addCleanup'): - self.addCleanup = self._addCleanup - - def _addCleanup(self, function, *args, **kwargs): - self._cleanups.append((function, args, kwargs)) - - def setUp(self): - # Try to minimize the number of children we have so this test - # doesn't crash on some buildbots (Alphas in particular). - if test_support: - test_support.reap_children() - - def tearDown(self): - for inst in subprocess._active: - inst.wait() - subprocess._cleanup() - # assertFalse is not available in python-2.3 - self.failIf(subprocess._active, "subprocess._active not empty") - - if not hasattr(self, 'doCleanups'): - ok = True - while self._cleanups: - function, args, kwargs = self._cleanups.pop(-1) - try: - function(*args, **kwargs) - except Exception: - ok = False - - def assertStderrEqual(self, stderr, expected, msg=None): - # In a debug build, stuff like "[6580 refs]" is printed to stderr at - # shutdown time. That frustrates tests trying to check stderr produced - # from a spawned Python process. - actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr) - self.assertEqual(actual, expected, msg) - - -class ProcessTestCase(BaseTestCase): - - def test_call_seq(self): - # call() function with sequence argument - rc = subprocess.call([sys.executable, "-c", - "import sys; sys.exit(47)"]) - self.assertEqual(rc, 47) - - def test_check_call_zero(self): - # check_call() function with zero return code - rc = subprocess.check_call([sys.executable, "-c", - "import sys; sys.exit(0)"]) - self.assertEqual(rc, 0) - - def test_check_call_nonzero(self): - # check_call() function with non-zero return code - #with self.assertRaises(subprocess.CalledProcessError) as c: - try: - subprocess.check_call([sys.executable, "-c", - "import sys; sys.exit(47)"]) - #self.assertEqual(c.exception.returncode, 47) - except subprocess.CalledProcessError, e: - self.assertEqual(e.returncode, 47) - else: - self.fail("Expected CalledProcessError") - - def test_check_output(self): - # check_output() function with zero return code - output = subprocess.check_output( - [sys.executable, "-c", "print 'BDFL'"]) - #self.assertIn('BDFL', output) - self.assert_('BDFL' in output) - - def test_check_output_nonzero(self): - # check_call() function with non-zero return code - #with self.assertRaises(subprocess.CalledProcessError) as c: - try: - subprocess.check_output( - [sys.executable, "-c", "import sys; sys.exit(5)"]) - #self.assertEqual(c.exception.returncode, 5) - except subprocess.CalledProcessError, e: - self.assertEqual(e.returncode, 5) - else: - self.fail("Expected CalledProcessError") - - def test_check_output_stderr(self): - # check_output() function stderr redirected to stdout - output = subprocess.check_output( - [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"], - stderr=subprocess.STDOUT) - #self.assertIn('BDFL', output) - self.assert_('BDFL' in output) - - def test_check_output_stdout_arg(self): - # check_output() function stderr redirected to stdout - #with self.assertRaises(ValueError) as c: - try: - output = subprocess.check_output( - [sys.executable, "-c", "print 'will not be run'"], - stdout=sys.stdout) - #self.fail("Expected ValueError when stdout arg supplied.") - #self.assertIn('stdout', c.exception.args[0]) - except ValueError, e: - #self.assertIn('stdout', e.args[0]) - self.assert_('stdout' in e.args[0]) - else: - self.fail("Expected ValueError when stdout arg supplied.") - - def test_call_kwargs(self): - # call() function with keyword args - newenv = os.environ.copy() - newenv["FRUIT"] = "banana" - rc = subprocess.call([sys.executable, "-c", - 'import sys, os;' - 'sys.exit(os.getenv("FRUIT")=="banana")'], - env=newenv) - self.assertEqual(rc, 1) - - def test_invalid_args(self): - # Popen() called with invalid arguments should raise TypeError - # but Popen.__del__ should not complain (issue #12085) - #with test_support.captured_stderr() as s: - orig_stderr = getattr(sys, 'stderr') - setattr(sys, 'stderr', StringIO.StringIO()) - s = sys.stderr - try: - try: - subprocess.Popen(invalid_arg_name=1) - except TypeError: - pass - except: - self.fail("Expected TypeError") - else: - self.fail("Expected TypeError") - argcount = subprocess.Popen.__init__.func_code.co_argcount - too_many_args = [0] * (argcount + 1) - try: - subprocess.Popen(*too_many_args) - except TypeError: - pass - except: - self.fail("Expected TypeError") - else: - self.fail("Expected TypeError") - finally: - setattr(sys,'stderr', orig_stderr) - self.assertEqual(s.getvalue(), '') - - def test_stdin_none(self): - # .stdin is None when not redirected - p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.addCleanup(p.stderr.close) - p.wait() - self.assertEqual(p.stdin, None) - - def test_stdout_none(self): - # .stdout is None when not redirected - p = subprocess.Popen([sys.executable, "-c", - 'print " this bit of output is from a ' - 'test of stdout in a different ' - 'process ..."'], - stdin=subprocess.PIPE, stderr=subprocess.PIPE) - self.addCleanup(p.stdin.close) - self.addCleanup(p.stderr.close) - p.wait() - self.assertEqual(p.stdout, None) - - def test_stderr_none(self): - # .stderr is None when not redirected - p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], - stdin=subprocess.PIPE, stdout=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.addCleanup(p.stdin.close) - p.wait() - self.assertEqual(p.stderr, None) - - def test_executable_with_cwd(self): - python_dir = os.path.dirname(os.path.realpath(sys.executable)) - p = subprocess.Popen(["somethingyoudonthave", "-c", - "import sys; sys.exit(47)"], - executable=sys.executable, cwd=python_dir) - p.wait() - self.assertEqual(p.returncode, 47) - - # Not available on python2.3 and we know we're not building python itself - #@unittest.skipIf(sysconfig.is_python_build(), - # "need an installed Python. See #7774") - def test_executable_without_cwd(self): - # For a normal installation, it should work without 'cwd' - # argument. For test runs in the build directory, see #7774. - p = subprocess.Popen(["somethingyoudonthave", "-c", - "import sys; sys.exit(47)"], - executable=sys.executable) - p.wait() - self.assertEqual(p.returncode, 47) - - def test_stdin_pipe(self): - # stdin redirection - p = subprocess.Popen([sys.executable, "-c", - 'import sys; sys.exit(sys.stdin.read() == "pear")'], - stdin=subprocess.PIPE) - p.stdin.write("pear") - p.stdin.close() - p.wait() - self.assertEqual(p.returncode, 1) - - def test_stdin_filedes(self): - # stdin is set to open file descriptor - tf = tempfile.TemporaryFile() - d = tf.fileno() - os.write(d, "pear") - os.lseek(d, 0, 0) - p = subprocess.Popen([sys.executable, "-c", - 'import sys; sys.exit(sys.stdin.read() == "pear")'], - stdin=d) - p.wait() - self.assertEqual(p.returncode, 1) - - def test_stdin_fileobj(self): - # stdin is set to open file object - tf = tempfile.TemporaryFile() - tf.write("pear") - tf.seek(0) - p = subprocess.Popen([sys.executable, "-c", - 'import sys; sys.exit(sys.stdin.read() == "pear")'], - stdin=tf) - p.wait() - self.assertEqual(p.returncode, 1) - - def test_stdout_pipe(self): - # stdout redirection - p = subprocess.Popen([sys.executable, "-c", - 'import sys; sys.stdout.write("orange")'], - stdout=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.read(), "orange") - - def test_stdout_filedes(self): - # stdout is set to open file descriptor - tf = tempfile.TemporaryFile() - d = tf.fileno() - p = subprocess.Popen([sys.executable, "-c", - 'import sys; sys.stdout.write("orange")'], - stdout=d) - p.wait() - os.lseek(d, 0, 0) - self.assertEqual(os.read(d, 1024), "orange") - - def test_stdout_fileobj(self): - # stdout is set to open file object - tf = tempfile.TemporaryFile() - p = subprocess.Popen([sys.executable, "-c", - 'import sys; sys.stdout.write("orange")'], - stdout=tf) - p.wait() - tf.seek(0) - self.assertEqual(tf.read(), "orange") - - def test_stderr_pipe(self): - # stderr redirection - p = subprocess.Popen([sys.executable, "-c", - 'import sys; sys.stderr.write("strawberry")'], - stderr=subprocess.PIPE) - self.addCleanup(p.stderr.close) - #self.assertStderrEqual(p.stderr.read(), "strawberry") - self.assertEqual(remove_stderr_debug_decorations(p.stderr.read()), - "strawberry") - - def test_stderr_filedes(self): - # stderr is set to open file descriptor - tf = tempfile.TemporaryFile() - d = tf.fileno() - p = subprocess.Popen([sys.executable, "-c", - 'import sys; sys.stderr.write("strawberry")'], - stderr=d) - p.wait() - os.lseek(d, 0, 0) - #self.assertStderrEqual(os.read(d, 1024), "strawberry") - self.assertEqual(remove_stderr_debug_decorations(os.read(d, 1024)), - "strawberry") - - def test_stderr_fileobj(self): - # stderr is set to open file object - tf = tempfile.TemporaryFile() - p = subprocess.Popen([sys.executable, "-c", - 'import sys; sys.stderr.write("strawberry")'], - stderr=tf) - p.wait() - tf.seek(0) - #self.assertStderrEqual(tf.read(), "strawberry") - self.assertEqual(remove_stderr_debug_decorations(tf.read()), - "strawberry") - - def test_stdout_stderr_pipe(self): - # capture stdout and stderr to the same pipe - p = subprocess.Popen([sys.executable, "-c", - 'import sys;' - 'sys.stdout.write("apple");' - 'sys.stdout.flush();' - 'sys.stderr.write("orange")'], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - self.addCleanup(p.stdout.close) - #self.assertStderrEqual(p.stdout.read(), "appleorange") - output = p.stdout.read() - stripped = remove_stderr_debug_decorations(output) - self.assertEqual(stripped, "appleorange") - - def test_stdout_stderr_file(self): - # capture stdout and stderr to the same open file - tf = tempfile.TemporaryFile() - p = subprocess.Popen([sys.executable, "-c", - 'import sys;' - 'sys.stdout.write("apple");' - 'sys.stdout.flush();' - 'sys.stderr.write("orange")'], - stdout=tf, - stderr=tf) - p.wait() - tf.seek(0) - #self.assertStderrEqual(tf.read(), "appleorange") - output = tf.read() - stripped = remove_stderr_debug_decorations(output) - self.assertEqual(stripped, "appleorange") - - def test_stdout_filedes_of_stdout(self): - # stdout is set to 1 (#1531862). - cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))" - rc = subprocess.call([sys.executable, "-c", cmd], stdout=1) - self.assertEqual(rc, 2) - - def test_cwd(self): - tmpdir = tempfile.gettempdir() - # We cannot use os.path.realpath to canonicalize the path, - # since it doesn't expand Tru64 {memb} strings. See bug 1063571. - cwd = os.getcwd() - os.chdir(tmpdir) - tmpdir = os.getcwd() - os.chdir(cwd) - p = subprocess.Popen([sys.executable, "-c", - 'import sys,os;' - 'sys.stdout.write(os.getcwd())'], - stdout=subprocess.PIPE, - cwd=tmpdir) - self.addCleanup(p.stdout.close) - normcase = os.path.normcase - self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir)) - - def test_env(self): - newenv = os.environ.copy() - newenv["FRUIT"] = "orange" - p = subprocess.Popen([sys.executable, "-c", - 'import sys,os;' - 'sys.stdout.write(os.getenv("FRUIT"))'], - stdout=subprocess.PIPE, - env=newenv) - self.assertEqual(p.stdout.read(), "orange") - - def test_communicate_stdin(self): - p = subprocess.Popen([sys.executable, "-c", - 'import sys;' - 'sys.exit(sys.stdin.read() == "pear")'], - stdin=subprocess.PIPE) - p.communicate("pear") - self.assertEqual(p.returncode, 1) - - def test_communicate_stdout(self): - p = subprocess.Popen([sys.executable, "-c", - 'import sys; sys.stdout.write("pineapple")'], - stdout=subprocess.PIPE) - (stdout, stderr) = p.communicate() - self.assertEqual(stdout, "pineapple") - self.assertEqual(stderr, None) - - def test_communicate_stderr(self): - p = subprocess.Popen([sys.executable, "-c", - 'import sys; sys.stderr.write("pineapple")'], - stderr=subprocess.PIPE) - (stdout, stderr) = p.communicate() - self.assertEqual(stdout, None) - #self.assertStderrEqual(stderr, "pineapple") - self.assertEqual(remove_stderr_debug_decorations(stderr), "pineapple") - - def test_communicate(self): - p = subprocess.Popen([sys.executable, "-c", - 'import sys,os;' - 'sys.stderr.write("pineapple");' - 'sys.stdout.write(sys.stdin.read())'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.addCleanup(p.stderr.close) - self.addCleanup(p.stdin.close) - (stdout, stderr) = p.communicate("banana") - self.assertEqual(stdout, "banana") - #self.assertStderrEqual(stderr, "pineapple") - self.assertEqual(remove_stderr_debug_decorations(stderr), - "pineapple") - - # Not available with python-2.6's unittest: Reimplement by - # raising SkipTest from nose - # This test is Linux specific for simplicity to at least have - # some coverage. It is not a platform specific bug. - #@unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), - # "Linux specific") - # Test for the fd leak reported in http://bugs.python.org/issue2791. - def test_communicate_pipe_fd_leak(self): - if not os.path.isdir('/proc/%d/fd' % os.getpid()): - raise SkipTest('Linux specific') - fd_directory = '/proc/%d/fd' % os.getpid() - num_fds_before_popen = len(os.listdir(fd_directory)) - p = subprocess.Popen([sys.executable, "-c", "print()"], - stdout=subprocess.PIPE) - p.communicate() - num_fds_after_communicate = len(os.listdir(fd_directory)) - del p - num_fds_after_destruction = len(os.listdir(fd_directory)) - self.assertEqual(num_fds_before_popen, num_fds_after_destruction) - self.assertEqual(num_fds_before_popen, num_fds_after_communicate) - - def test_communicate_returns(self): - # communicate() should return None if no redirection is active - p = subprocess.Popen([sys.executable, "-c", - "import sys; sys.exit(47)"]) - (stdout, stderr) = p.communicate() - self.assertEqual(stdout, None) - self.assertEqual(stderr, None) - - def test_communicate_pipe_buf(self): - # communicate() with writes larger than pipe_buf - # This test will probably deadlock rather than fail, if - # communicate() does not work properly. - x, y = os.pipe() - if mswindows: - pipe_buf = 512 - else: - pipe_buf = os.fpathconf(x, "PC_PIPE_BUF") - os.close(x) - os.close(y) - p = subprocess.Popen([sys.executable, "-c", - 'import sys,os;' - 'sys.stdout.write(sys.stdin.read(47));' - 'sys.stderr.write("xyz"*%d);' - 'sys.stdout.write(sys.stdin.read())' % pipe_buf], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.addCleanup(p.stderr.close) - self.addCleanup(p.stdin.close) - string_to_write = "abc"*pipe_buf - (stdout, stderr) = p.communicate(string_to_write) - self.assertEqual(stdout, string_to_write) - - def test_writes_before_communicate(self): - # stdin.write before communicate() - p = subprocess.Popen([sys.executable, "-c", - 'import sys,os;' - 'sys.stdout.write(sys.stdin.read())'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.addCleanup(p.stderr.close) - self.addCleanup(p.stdin.close) - p.stdin.write("banana") - (stdout, stderr) = p.communicate("split") - self.assertEqual(stdout, "bananasplit") - #self.assertStderrEqual(stderr, "") - self.assertEqual(remove_stderr_debug_decorations(stderr), "") - - def test_universal_newlines(self): - p = subprocess.Popen([sys.executable, "-c", - 'import sys,os;' + SETBINARY + - 'sys.stdout.write("line1\\n");' - 'sys.stdout.flush();' - 'sys.stdout.write("line2\\r");' - 'sys.stdout.flush();' - 'sys.stdout.write("line3\\r\\n");' - 'sys.stdout.flush();' - 'sys.stdout.write("line4\\r");' - 'sys.stdout.flush();' - 'sys.stdout.write("\\nline5");' - 'sys.stdout.flush();' - 'sys.stdout.write("\\nline6");'], - stdout=subprocess.PIPE, - universal_newlines=1) - self.addCleanup(p.stdout.close) - stdout = p.stdout.read() - if hasattr(file, 'newlines'): - # Interpreter with universal newline support - self.assertEqual(stdout, - "line1\nline2\nline3\nline4\nline5\nline6") - else: - # Interpreter without universal newline support - self.assertEqual(stdout, - "line1\nline2\rline3\r\nline4\r\nline5\nline6") - - def test_universal_newlines_communicate(self): - # universal newlines through communicate() - p = subprocess.Popen([sys.executable, "-c", - 'import sys,os;' + SETBINARY + - 'sys.stdout.write("line1\\n");' - 'sys.stdout.flush();' - 'sys.stdout.write("line2\\r");' - 'sys.stdout.flush();' - 'sys.stdout.write("line3\\r\\n");' - 'sys.stdout.flush();' - 'sys.stdout.write("line4\\r");' - 'sys.stdout.flush();' - 'sys.stdout.write("\\nline5");' - 'sys.stdout.flush();' - 'sys.stdout.write("\\nline6");'], - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - universal_newlines=1) - self.addCleanup(p.stdout.close) - self.addCleanup(p.stderr.close) - (stdout, stderr) = p.communicate() - if hasattr(file, 'newlines'): - # Interpreter with universal newline support - self.assertEqual(stdout, - "line1\nline2\nline3\nline4\nline5\nline6") - else: - # Interpreter without universal newline support - self.assertEqual(stdout, - "line1\nline2\rline3\r\nline4\r\nline5\nline6") - - def test_no_leaking(self): - if not test_support: - raise SkipTest("No test_support module available.") - - # Make sure we leak no resources - if not mswindows: - max_handles = 1026 # too much for most UNIX systems - else: - max_handles = 2050 # too much for (at least some) Windows setups - handles = [] - try: - for i in range(max_handles): - try: - handles.append(os.open(test_support.TESTFN, - os.O_WRONLY | os.O_CREAT)) - except OSError, e: - if e.errno != errno.EMFILE: - raise - break - else: - # python-2.3 unittest doesn't have skipTest. Reimplement with nose - #self.skipTest("failed to reach the file descriptor limit " - # "(tried %d)" % max_handles) - raise SkipTest("failed to reach the file descriptor limit " - "(tried %d)" % max_handles) - - # Close a couple of them (should be enough for a subprocess) - for i in range(10): - os.close(handles.pop()) - # Loop creating some subprocesses. If one of them leaks some fds, - # the next loop iteration will fail by reaching the max fd limit. - for i in range(15): - p = subprocess.Popen([sys.executable, "-c", - "import sys;" - "sys.stdout.write(sys.stdin.read())"], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - data = p.communicate("lime")[0] - self.assertEqual(data, "lime") - finally: - for h in handles: - os.close(h) - - def test_list2cmdline(self): - self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), - '"a b c" d e') - self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), - 'ab\\"c \\ d') - self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']), - 'ab\\"c " \\\\" d') - self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), - 'a\\\\\\b "de fg" h') - self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), - 'a\\\\\\"b c d') - self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), - '"a\\\\b c" d e') - self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), - '"a\\\\b\\ c" d e') - self.assertEqual(subprocess.list2cmdline(['ab', '']), - 'ab ""') - - - def test_poll(self): - p = subprocess.Popen([sys.executable, - "-c", "import time; time.sleep(1)"]) - count = 0 - while p.poll() is None: - time.sleep(0.1) - count += 1 - # We expect that the poll loop probably went around about 10 times, - # but, based on system scheduling we can't control, it's possible - # poll() never returned None. It "should be" very rare that it - # didn't go around at least twice. - #self.assertGreaterEqual(count, 2) - self.assert_(count >= 2) - # Subsequent invocations should just return the returncode - self.assertEqual(p.poll(), 0) - - - def test_wait(self): - p = subprocess.Popen([sys.executable, - "-c", "import time; time.sleep(2)"]) - self.assertEqual(p.wait(), 0) - # Subsequent invocations should just return the returncode - self.assertEqual(p.wait(), 0) - - - def test_invalid_bufsize(self): - # an invalid type of the bufsize argument should raise - # TypeError. - #with self.assertRaises(TypeError): - try: - subprocess.Popen([sys.executable, "-c", "pass"], "orange") - except TypeError: - pass - else: - self.fail("Expected TypeError") - - def test_leaking_fds_on_error(self): - # see bug #5179: Popen leaks file descriptors to PIPEs if - # the child fails to execute; this will eventually exhaust - # the maximum number of open fds. 1024 seems a very common - # value for that limit, but Windows has 2048, so we loop - # 1024 times (each call leaked two fds). - for i in range(1024): - # Windows raises IOError. Others raise OSError. - #with self.assertRaises(EnvironmentError) as c: - try: - subprocess.Popen(['nonexisting_i_hope'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - #if c.exception.errno != 2: # ignore "no such file" - # raise c.exception - # Windows raises IOError - except EnvironmentError, err: - if err.errno not in (errno.ENOENT, errno.EACCES): # ignore "no such file" - raise err - except: - self.fail("Expected EnvironmentError") - else: - self.fail("Expected EnvironmentError") - - def test_handles_closed_on_exception(self): - # If CreateProcess exits with an error, ensure the - # duplicate output handles are released - ifhandle, ifname = mkstemp() - ofhandle, ofname = mkstemp() - efhandle, efname = mkstemp() - try: - subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, - stderr=efhandle) - except OSError: - os.close(ifhandle) - os.remove(ifname) - os.close(ofhandle) - os.remove(ofname) - os.close(efhandle) - os.remove(efname) - self.assert_(not os.path.exists(ifname)) - self.assert_(not os.path.exists(ofname)) - self.assert_(not os.path.exists(efname)) - - def test_communicate_epipe(self): - # Issue 10963: communicate() should hide EPIPE - p = subprocess.Popen([sys.executable, "-c", 'pass'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.addCleanup(p.stderr.close) - self.addCleanup(p.stdin.close) - p.communicate("x" * 2**20) - - def test_communicate_epipe_only_stdin(self): - # Issue 10963: communicate() should hide EPIPE - p = subprocess.Popen([sys.executable, "-c", 'pass'], - stdin=subprocess.PIPE) - self.addCleanup(p.stdin.close) - time.sleep(2) - p.communicate("x" * 2**20) - -# context manager -class _SuppressCoreFiles(object): - """Try to prevent core files from being created.""" - old_limit = None - - def __enter__(self): - """Try to save previous ulimit, then set it to (0, 0).""" - try: - import resource - self.old_limit = resource.getrlimit(resource.RLIMIT_CORE) - resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) - except (ImportError, ValueError, resource.error): - pass - - if sys.platform == 'darwin': - # Check if the 'Crash Reporter' on OSX was configured - # in 'Developer' mode and warn that it will get triggered - # when it is. - # - # This assumes that this context manager is used in tests - # that might trigger the next manager. - value = subprocess.Popen(['/usr/bin/defaults', 'read', - 'com.apple.CrashReporter', 'DialogType'], - stdout=subprocess.PIPE).communicate()[0] - if value.strip() == 'developer': - print "this tests triggers the Crash Reporter, that is intentional" - sys.stdout.flush() - - def __exit__(self, *args): - """Return core file behavior to default.""" - if self.old_limit is None: - return - try: - import resource - resource.setrlimit(resource.RLIMIT_CORE, self.old_limit) - except (ImportError, ValueError, resource.error): - pass - - # python-2.3 unittest doesn't have skipUnless. Reimplement as SkipTest from nose - #@unittest.skipUnless(hasattr(signal, 'SIGALRM'), - # "Requires signal.SIGALRM") - def test_communicate_eintr(self): - if not hasattr(signal, 'SIGALRM'): - raise SkipTest('Requires signal.SIGALRM') - # Issue #12493: communicate() should handle EINTR - def handler(signum, frame): - pass - old_handler = signal.signal(signal.SIGALRM, handler) - self.addCleanup(signal.signal, signal.SIGALRM, old_handler) - - # the process is running for 2 seconds - args = [sys.executable, "-c", 'import time; time.sleep(2)'] - for stream in ('stdout', 'stderr'): - kw = {stream: subprocess.PIPE} - #with subprocess.Popen(args, **kw) as process: - try: - process = subprocess.Popen(args, **kw) - signal.alarm(1) - # communicate() will be interrupted by SIGALRM - process.communicate() - finally: - process.close() - - -# Not available with python-2.3's unittest. Reimplement with SkipTest from -# nose -#@unittest.skipIf(mswindows, "POSIX specific tests") -class POSIXProcessTestCase(BaseTestCase): - def setUp(self): - if mswindows: - raise SkipTest('POSIX specific tests') - - def test_exceptions(self): - # caught & re-raised exceptions - #with self.assertRaises(OSError) as c: - try: - p = subprocess.Popen([sys.executable, "-c", ""], - cwd="/this/path/does/not/exist") - # The attribute child_traceback should contain "os.chdir" somewhere. - #self.assertIn("os.chdir", c.exception.child_traceback) - except OSError, e: - self.assertNotEqual(e.child_traceback.find("os.chdir"), -1) - except: - self.fail("Expected OSError") - else: - self.fail("Expected OSError") - - def _suppress_core_files(self): - """Try to prevent core files from being created. - Returns previous ulimit if successful, else None. - """ - try: - import resource - old_limit = resource.getrlimit(resource.RLIMIT_CORE) - resource.setrlimit(resource.RLIMIT_CORE, (0,0)) - return old_limit - except (ImportError, ValueError, resource.error): - return None - - def _unsuppress_core_files(self, old_limit): - """Return core file behavior to default.""" - if old_limit is None: - return - try: - import resource - resource.setrlimit(resource.RLIMIT_CORE, old_limit) - except (ImportError, ValueError, resource.error): - return - - def test_run_abort(self): - # returncode handles signal termination - #with _SuppressCoreFiles(): - old_limit = self._suppress_core_files() - try: - p = subprocess.Popen([sys.executable, "-c", - "import os; os.abort()"]) - finally: - self._unsuppress_core_files(old_limit) - - #p.wait() - p.wait() - self.assertEqual(-p.returncode, signal.SIGABRT) - - def test_preexec(self): - # preexec function - p = subprocess.Popen([sys.executable, "-c", - "import sys, os;" - "sys.stdout.write(os.getenv('FRUIT'))"], - stdout=subprocess.PIPE, - preexec_fn=lambda: os.putenv("FRUIT", "apple")) - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.read(), "apple") - - def test_args_string(self): - # args is a string - f, fname = mkstemp() - os.write(f, "#!/bin/sh\n") - os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % - sys.executable) - os.close(f) - # 0o is not available in python2.5 - #os.chmod(fname, 0o700) - os.chmod(fname, 0700) - p = subprocess.Popen(fname) - p.wait() - os.remove(fname) - self.assertEqual(p.returncode, 47) - - def test_invalid_args(self): - # invalid arguments should raise ValueError - self.assertRaises(ValueError, subprocess.call, - [sys.executable, "-c", - "import sys; sys.exit(47)"], - startupinfo=47) - self.assertRaises(ValueError, subprocess.call, - [sys.executable, "-c", - "import sys; sys.exit(47)"], - creationflags=47) - - def test_shell_sequence(self): - # Run command through the shell (sequence) - newenv = os.environ.copy() - newenv["FRUIT"] = "apple" - p = subprocess.Popen(["echo $FRUIT"], shell=1, - stdout=subprocess.PIPE, - env=newenv) - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.read().strip(), "apple") - - def test_shell_string(self): - # Run command through the shell (string) - newenv = os.environ.copy() - newenv["FRUIT"] = "apple" - p = subprocess.Popen("echo $FRUIT", shell=1, - stdout=subprocess.PIPE, - env=newenv) - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.read().strip(), "apple") - - def test_call_string(self): - # call() function with string argument on UNIX - f, fname = mkstemp() - os.write(f, "#!/bin/sh\n") - os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % - sys.executable) - os.close(f) - os.chmod(fname, 0700) - rc = subprocess.call(fname) - os.remove(fname) - self.assertEqual(rc, 47) - - def test_specific_shell(self): - # Issue #9265: Incorrect name passed as arg[0]. - shells = [] - for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']: - for name in ['bash', 'ksh']: - sh = os.path.join(prefix, name) - if os.path.isfile(sh): - shells.append(sh) - if not shells: # Will probably work for any shell but csh. - - # skipTest unavailable on python<2.7 reimplement with nose - #self.skipTest("bash or ksh required for this test") - raise SkipTest("bash or ksh required for this test") - sh = '/bin/sh' - if os.path.isfile(sh) and not os.path.islink(sh): - # Test will fail if /bin/sh is a symlink to csh. - shells.append(sh) - for sh in shells: - p = subprocess.Popen("echo $0", executable=sh, shell=True, - stdout=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.read().strip(), sh) - - def _kill_process(self, method, *args): - # Do not inherit file handles from the parent. - # It should fix failures on some platforms. - p = subprocess.Popen([sys.executable, "-c", """if 1: - import sys, time - sys.stdout.write('x\\n') - sys.stdout.flush() - time.sleep(30) - """], - close_fds=True, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - # Wait for the interpreter to be completely initialized before - # sending any signal. - p.stdout.read(1) - getattr(p, method)(*args) - return p - - # These are still hanging with x86_64 Fedora 12 (python-2.6) subprocess - # backport from current trunk run under nosetests - - def test_send_signal(self): - #if hang DISABLED #2777 - p = self._kill_process('send_signal', signal.SIGINT) - _, stderr = p.communicate() - self.assert_('KeyboardInterrupt' in stderr) - self.assertNotEqual(p.wait(), 0) - - def test_kill(self): - #if hang DISABLED #2777 - p = self._kill_process('kill') - _, stderr = p.communicate() - self.assertStderrEqual(stderr, '') - self.assertEqual(p.wait(), -signal.SIGKILL) - - def test_terminate(self): - #if hang DISABLED #2777 - p = self._kill_process('terminate') - _, stderr = p.communicate() - self.assertStderrEqual(stderr, '') - self.assertEqual(p.wait(), -signal.SIGTERM) - - def check_close_std_fds(self, fds): - # Issue #9905: test that subprocess pipes still work properly with - # some standard fds closed - stdin = 0 - newfds = [] - for a in fds: - b = os.dup(a) - newfds.append(b) - if a == 0: - stdin = b - try: - for fd in fds: - os.close(fd) - out, err = subprocess.Popen([sys.executable, "-c", - 'import sys;' - 'sys.stdout.write("apple");' - 'sys.stdout.flush();' - 'sys.stderr.write("orange")'], - stdin=stdin, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE).communicate() - err = re.sub(r"\[\d+ refs\]\r?\n?$", "", err).strip() - self.assertEqual((out, err), ('apple', 'orange')) - finally: - for b, a in zip(newfds, fds): - os.dup2(b, a) - for b in newfds: - os.close(b) - - def test_close_fd_0(self): - self.check_close_std_fds([0]) - - def test_close_fd_1(self): - self.check_close_std_fds([1]) - - def test_close_fd_2(self): - self.check_close_std_fds([2]) - - def test_close_fds_0_1(self): - self.check_close_std_fds([0, 1]) - - def test_close_fds_0_2(self): - self.check_close_std_fds([0, 2]) - - def test_close_fds_1_2(self): - self.check_close_std_fds([1, 2]) - - def test_close_fds_0_1_2(self): - # Issue #10806: test that subprocess pipes still work properly with - # all standard fds closed. - self.check_close_std_fds([0, 1, 2]) - - def check_swap_fds(self, stdin_no, stdout_no, stderr_no): - # open up some temporary files - temps = [mkstemp() for i in range(3)] - temp_fds = [fd for fd, fname in temps] - try: - # unlink the files -- we won't need to reopen them - for fd, fname in temps: - os.unlink(fname) - - # save a copy of the standard file descriptors - saved_fds = [os.dup(fd) for fd in range(3)] - try: - # duplicate the temp files over the standard fd's 0, 1, 2 - for fd, temp_fd in enumerate(temp_fds): - os.dup2(temp_fd, fd) - - # write some data to what will become stdin, and rewind - os.write(stdin_no, "STDIN") - os.lseek(stdin_no, 0, 0) - - # now use those files in the given order, so that subprocess - # has to rearrange them in the child - p = subprocess.Popen([sys.executable, "-c", - 'import sys; got = sys.stdin.read();' - 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], - stdin=stdin_no, - stdout=stdout_no, - stderr=stderr_no) - p.wait() - - for fd in temp_fds: - os.lseek(fd, 0, 0) - - out = os.read(stdout_no, 1024) - err = re.sub(r"\[\d+ refs\]\r?\n?$", "", os.read(stderr_no, 1024)).strip() - finally: - for std, saved in enumerate(saved_fds): - os.dup2(saved, std) - os.close(saved) - - self.assertEqual(out, "got STDIN") - self.assertEqual(err, "err") - - finally: - for fd in temp_fds: - os.close(fd) - - # When duping fds, if there arises a situation where one of the fds is - # either 0, 1 or 2, it is possible that it is overwritten (#12607). - # This tests all combinations of this. - def test_swap_fds(self): - self.check_swap_fds(0, 1, 2) - self.check_swap_fds(0, 2, 1) - self.check_swap_fds(1, 0, 2) - self.check_swap_fds(1, 2, 0) - self.check_swap_fds(2, 0, 1) - self.check_swap_fds(2, 1, 0) - - def test_wait_when_sigchild_ignored(self): - # NOTE: sigchild_ignore.py may not be an effective test on all OSes. - if not test_support: - raise SkipTest("No test_support module available.") - sigchild_ignore = test_support.findfile(os.path.join("subprocessdata", - "sigchild_ignore.py")) - p = subprocess.Popen([sys.executable, sigchild_ignore], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = p.communicate() - self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" - " non-zero with this error:\n%s" % stderr) - - - def test_zombie_fast_process_del(self): - # Issue #12650: on Unix, if Popen.__del__() was called before the - # process exited, it wouldn't be added to subprocess._active, and would - # remain a zombie. - # spawn a Popen, and delete its reference before it exits - p = subprocess.Popen([sys.executable, "-c", - 'import sys, time;' - 'time.sleep(0.2)'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.addCleanup(p.stderr.close) - ident = id(p) - pid = p.pid - del p - # check that p is in the active processes list - # assertIn not in python< 2.7 - #self.assertIn(ident, [id(o) for o in subprocess._active]) - self.assert_(ident in [id(o) for o in subprocess._active]) - - def test_leak_fast_process_del_killed(self): - # Issue #12650: on Unix, if Popen.__del__() was called before the - # process exited, and the process got killed by a signal, it would never - # be removed from subprocess._active, which triggered a FD and memory - # leak. - # spawn a Popen, delete its reference and kill it - p = subprocess.Popen([sys.executable, "-c", - 'import time;' - 'time.sleep(3)'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.addCleanup(p.stderr.close) - ident = id(p) - pid = p.pid - del p - os.kill(pid, signal.SIGKILL) - # check that p is in the active processes list - # assertIn not in python < 2.7 - self.assert_(ident in [id(o) for o in subprocess._active]) - #self.assertIn(ident, [id(o) for o in subprocess._active]) - - # let some time for the process to exit, and create a new Popen: this - # should trigger the wait() of p - time.sleep(0.2) - #with self.assertRaises(EnvironmentError) as c: - try: - try: - proc = subprocess.Popen(['nonexisting_i_hope'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - finally: - pass - except EnvironmentError: - pass - except: - self.fail('Expected EnvironmentError') - else: - self.fail('Expected EnvironmentError') - # p should have been wait()ed on, and removed from the _active list - #self.assertRaises(OSError, os.waitpid, pid, 0) - try: - os.waitpid(pid, 0) - except OSError: - pass - except: - self.fail('Expected OSError') - else: - self.fail('Expected OSError') - #self.assertNotIn(ident, [id(o) for o in subprocess._active]) - self.assert_(ident not in [id(o) for o in subprocess._active]) - - def test_pipe_cloexec(self): - # Issue 12786: check that the communication pipes' FDs are set CLOEXEC, - # and are not inherited by another child process. - p1 = subprocess.Popen([sys.executable, "-c", - 'import os;' - 'os.read(0, 1)' - ], - stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - - p2 = subprocess.Popen([sys.executable, "-c", """if True: - import os, errno, sys - for fd in %r: - try: - os.close(fd) - except OSError, e: - if e.errno != errno.EBADF: - raise - else: - sys.exit(1) - sys.exit(0) - """ % [f.fileno() for f in (p1.stdin, p1.stdout, - p1.stderr)] - ], - stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, close_fds=False) - p1.communicate('foo') - _, stderr = p2.communicate() - - self.assertEqual(p2.returncode, 0, "Unexpected error: " + repr(stderr)) - - -# Not available with python-2.3's unittest, reimplement with SkipTest from -# nose -##@unittest.skipUnless(mswindows, "Windows specific tests") -class Win32ProcessTestCase(BaseTestCase): - def setUp(self): - if not mswindows: - raise SkipTest('Windows specific tests') - - def test_startupinfo(self): - # startupinfo argument - # We uses hardcoded constants, because we do not want to - # depend on win32all. - STARTF_USESHOWWINDOW = 1 - SW_MAXIMIZE = 3 - startupinfo = subprocess.STARTUPINFO() - startupinfo.dwFlags = STARTF_USESHOWWINDOW - startupinfo.wShowWindow = SW_MAXIMIZE - # Since Python is a console process, it won't be affected - # by wShowWindow, but the argument should be silently - # ignored - subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"], - startupinfo=startupinfo) - - def test_creationflags(self): - # creationflags argument - CREATE_NEW_CONSOLE = 16 - sys.stderr.write(" a DOS box should flash briefly ...\n") - subprocess.call(sys.executable + - ' -c "import time; time.sleep(0.25)"', - creationflags=CREATE_NEW_CONSOLE) - - def test_invalid_args(self): - # invalid arguments should raise ValueError - self.assertRaises(ValueError, subprocess.call, - [sys.executable, "-c", - "import sys; sys.exit(47)"], - preexec_fn=lambda: 1) - self.assertRaises(ValueError, subprocess.call, - [sys.executable, "-c", - "import sys; sys.exit(47)"], - stdout=subprocess.PIPE, - close_fds=True) - - def test_close_fds(self): - # close file descriptors - rc = subprocess.call([sys.executable, "-c", - "import sys; sys.exit(47)"], - close_fds=True) - self.assertEqual(rc, 47) - - def test_shell_sequence(self): - # Run command through the shell (sequence) - newenv = os.environ.copy() - newenv["FRUIT"] = "physalis" - p = subprocess.Popen(["set"], shell=1, - stdout=subprocess.PIPE, - env=newenv) - self.addCleanup(p.stdout.close) - #self.assertIn("physalis", p.stdout.read()) - self.assert_("physalis" in p.stdout.read()) - - def test_shell_string(self): - # Run command through the shell (string) - newenv = os.environ.copy() - newenv["FRUIT"] = "physalis" - p = subprocess.Popen("set", shell=1, - stdout=subprocess.PIPE, - env=newenv) - self.addCleanup(p.stdout.close) - #self.assertIn("physalis", p.stdout.read()) - self.assert_("physalis" in p.stdout.read()) - - def test_call_string(self): - # call() function with string argument on Windows - rc = subprocess.call(sys.executable + - ' -c "import sys; sys.exit(47)"') - self.assertEqual(rc, 47) - - def _kill_process(self, method, *args): - # Some win32 buildbot raises EOFError if stdin is inherited - p = subprocess.Popen([sys.executable, "-c", """if 1: - import sys, time - sys.stdout.write('x\\n') - sys.stdout.flush() - time.sleep(30) - """], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.addCleanup(p.stderr.close) - self.addCleanup(p.stdin.close) - # Wait for the interpreter to be completely initialized before - # sending any signal. - p.stdout.read(1) - getattr(p, method)(*args) - _, stderr = p.communicate() - self.assertStderrEqual(stderr, '') - returncode = p.wait() - self.assertNotEqual(returncode, 0) - - def test_send_signal(self): - #if hang DISABLED #2777 - self._kill_process('send_signal', signal.SIGTERM) - - def test_kill(self): - #if hang DISABLED #2777 - self._kill_process('kill') - - def test_terminate(self): - #if hang DISABLED #2777 - self._kill_process('terminate') - - -# Not available with python-2.3's unittest. reimplement with SkipTest from -# nose -#@unittest.skipUnless(getattr(subprocess, '_has_poll', False), -# "poll system call not supported") -class ProcessTestCaseNoPoll(ProcessTestCase): - def setUp(self): - if not getattr(subprocess, '_has_poll', False): - raise SkipTest('poll system call not supported') - subprocess._has_poll = False - ProcessTestCase.setUp(self) - - def tearDown(self): - subprocess._has_poll = True - ProcessTestCase.tearDown(self) - - -class HelperFunctionTests(unittest.TestCase): - # Not available with python-2.3's unittest. reimplement with SkipTest from - # nose - #@unittest.skipIf(mswindows, "errno and EINTR make no sense on windows") - def test_eintr_retry_call(self): - if mswindows: - raise SkipTest('errno and EINTR make no sense on windows') - record_calls = [] - def fake_os_func(*args): - record_calls.append(args) - if len(record_calls) == 2: - raise OSError(errno.EINTR, "fake interrupted system call") - # reversed() is not available in python-2.3 - args = list(args) - args.reverse() - return tuple(args) - - self.assertEqual((999, 256), - subprocess._eintr_retry_call(fake_os_func, 256, 999)) - self.assertEqual([(256, 999)], record_calls) - # This time there will be an EINTR so it will loop once. - self.assertEqual((666,), - subprocess._eintr_retry_call(fake_os_func, 666)) - self.assertEqual([(256, 999), (666,), (666,)], record_calls) - - -# SkipUnless is not available on python < 2.7, reimplement with nose -#@unittest.skipUnless(mswindows, "mswindows only") -class CommandsWithSpaces (BaseTestCase): - - def setUp(self): - if not mswindows: - raise SkipTest('mswindows only') - - super(CommandsWithSpaces, self).setUp() - f, fname = mkstemp(".py", "te st") - self.fname = fname.lower () - os.write(f, "import sys;" - "sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))" - ) - os.close(f) - - def tearDown(self): - os.remove(self.fname) - super(CommandsWithSpaces, self).tearDown() - - def with_spaces(self, *args, **kwargs): - kwargs['stdout'] = subprocess.PIPE - p = subprocess.Popen(*args, **kwargs) - self.addCleanup(p.stdout.close) - self.assertEqual( - p.stdout.read ().decode("mbcs"), - "2 [%r, 'ab cd']" % self.fname - ) - - def test_shell_string_with_spaces(self): - # call() function with string argument with spaces on Windows - self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, - "ab cd"), shell=1) - - def test_shell_sequence_with_spaces(self): - # call() function with sequence argument with spaces on Windows - self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) - - def test_noshell_string_with_spaces(self): - # call() function with string argument with spaces on Windows - self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, - "ab cd")) - - def test_noshell_sequence_with_spaces(self): - # call() function with sequence argument with spaces on Windows - self.with_spaces([sys.executable, self.fname, "ab cd"]) - - -# We're using nosetests so we don't need this. This just leads to tests being -# run twice -#def test_main(): -# unit_tests = (ProcessTestCase, -# POSIXProcessTestCase, -# Win32ProcessTestCase, -# ProcessTestCaseNoPoll, -# HelperFunctionTests -# ) -# -# test_support.run_unittest(*unit_tests) -# test_support.reap_children() -# -#if __name__ == "__main__": -# test_main() diff --git a/kitchen3/docs/api-overview.rst b/kitchen3/docs/api-overview.rst index dda56fe..e53a94d 100644 --- a/kitchen3/docs/api-overview.rst +++ b/kitchen3/docs/api-overview.rst @@ -16,9 +16,6 @@ that may drag in more dependencies can be found on the `project webpage`_ api-collections api-iterutils api-versioning - api-pycompat24 - api-pycompat25 - api-pycompat27 api-exceptions .. _`project webpage`: https://fedorahosted.org/kitchen diff --git a/kitchen3/docs/api-pycompat24.rst b/kitchen3/docs/api-pycompat24.rst deleted file mode 100644 index a3247b6..0000000 --- a/kitchen3/docs/api-pycompat24.rst +++ /dev/null @@ -1,34 +0,0 @@ -======================= -Python 2.4 Compatibiity -======================= - - -------------------- -Sets for python-2.3 -------------------- - -.. automodule:: kitchen.pycompat24.sets -.. autofunction:: kitchen.pycompat24.sets.add_builtin_set - ----------------------------------- -Partial new style base64 interface ----------------------------------- - -.. automodule:: kitchen.pycompat24.base64 - :members: - ----------- -Subprocess ----------- - -.. seealso:: - - :mod:`kitchen.pycompat27.subprocess` - Kitchen includes the python-2.7 version of subprocess which has a new - function, :func:`~kitchen.pycompat27.subprocess.check_output`. When - you import :mod:`pycompat24.subprocess` you will be getting the - python-2.7 version of subprocess rather than the 2.4 version (where - subprocess first appeared). This choice was made so that we can - concentrate our efforts on keeping the single version of subprocess up - to date rather than working on a 2.4 version that very few people - would need specifically. diff --git a/kitchen3/docs/api-pycompat25.rst b/kitchen3/docs/api-pycompat25.rst deleted file mode 100644 index 2323c2f..0000000 --- a/kitchen3/docs/api-pycompat25.rst +++ /dev/null @@ -1,8 +0,0 @@ -======================== -Python 2.5 Compatibility -======================== - -.. automodule:: kitchen.pycompat25 - -.. automodule:: kitchen.pycompat25.collections.defaultdict - diff --git a/kitchen3/docs/api-pycompat27.rst b/kitchen3/docs/api-pycompat27.rst deleted file mode 100644 index 6ef6db1..0000000 --- a/kitchen3/docs/api-pycompat27.rst +++ /dev/null @@ -1,35 +0,0 @@ -======================== -Python 2.7 Compatibility -======================== - -.. module:: kitchen.pycompat27.subprocess - --------------------------- -Subprocess from Python 2.7 --------------------------- - -The :mod:`subprocess` module included here is a direct import from -python-2.7's |stdlib|_. You can access it via:: - - >>> from kitchen.pycompat27 import subprocess - -The motivation for including this module is that various API changing -improvements have been made to subprocess over time. The following is a list -of the known changes to :mod:`subprocess` with the python version they were -introduced in: - -==================================== === -New API Feature Ver -==================================== === -:exc:`subprocess.CalledProcessError` 2.5 -:func:`subprocess.check_call` 2.5 -:func:`subprocess.check_output` 2.7 -:meth:`subprocess.Popen.send_signal` 2.6 -:meth:`subprocess.Popen.terminate` 2.6 -:meth:`subprocess.Popen.kill` 2.6 -==================================== === - -.. seealso:: - - The stdlib :mod:`subprocess` documenation - For complete documentation on how to use subprocess diff --git a/kitchen3/tests/subprocessdata/sigchild_ignore.py b/kitchen3/tests/subprocessdata/sigchild_ignore.py deleted file mode 100644 index 5b6dd08..0000000 --- a/kitchen3/tests/subprocessdata/sigchild_ignore.py +++ /dev/null @@ -1,11 +0,0 @@ -import os -import signal, sys -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..')) - -from kitchen.pycompat27.subprocess import _subprocess as subprocess - -# On Linux this causes os.waitpid to fail with OSError as the OS has already -# reaped our child process. The wait() passing the OSError on to the caller -# and causing us to exit with an error is what we are testing against. -signal.signal(signal.SIGCHLD, signal.SIG_IGN) -subprocess.Popen([sys.executable, '-c', 'print("albatross")']).wait() diff --git a/kitchen3/tests/test__all__.py b/kitchen3/tests/test__all__.py index d25cb3f..7ceef8b 100644 --- a/kitchen3/tests/test__all__.py +++ b/kitchen3/tests/test__all__.py @@ -4,8 +4,6 @@ from nose import tools import os import types import warnings -from kitchen.pycompat24.sets import add_builtin_set -add_builtin_set() def logit(msg): log = open('/var/tmp/test.log', 'a') diff --git a/kitchen3/tests/test_collections.py b/kitchen3/tests/test_collections.py index e3da84b..12a6d04 100644 --- a/kitchen3/tests/test_collections.py +++ b/kitchen3/tests/test_collections.py @@ -3,9 +3,6 @@ import unittest from nose import tools -from kitchen.pycompat24.sets import add_builtin_set -add_builtin_set() - from kitchen import collections def test_strict_dict_get_set(): diff --git a/kitchen3/tests/test_deprecation_py3.py b/kitchen3/tests/test_deprecation_py3.py deleted file mode 100644 index f03c0df..0000000 --- a/kitchen3/tests/test_deprecation_py3.py +++ /dev/null @@ -1,65 +0,0 @@ -# -*- coding: utf-8 -*- - -from nose import tools - -import sys -import warnings - -import importlib -from kitchen.pycompat25.collections import defaultdict - -class TestPendingDeprecationModules(object): - def __init__(self): - kitchen_path = 'kitchen' - collections_path = 'kitchen/collections' - pycompat24_path = 'kitchen/pycompat24' - pycompat25_path = 'kitchen/pycompat25' - pycompat27_path = 'kitchen/pycompat27' - - self.module_data = ( - ('strictdict', 'kitchen.collections.strictdict', collections_path), - ('pycompat24', 'kitchen.pycompat24', kitchen_path), - ('base64', 'kitchen.pycompat24.base64', pycompat24_path), - ('sets', 'kitchen.pycompat24.sets', pycompat24_path), - ('subprocess', 'kitchen.pycompat24.subprocess', pycompat24_path), - ('pycompat25', 'kitchen.pycompat25', kitchen_path), - ('collections', 'kitchen.pycompat25.collections', pycompat25_path), - ('pycompat27', 'kitchen.pycompat27', kitchen_path), - ('subprocess', 'kitchen.pycompat27.subprocess', pycompat27_path), - ) - - def setUp(self): - for module in sys.modules.values(): - if hasattr(module, '__warningregistry__'): - del module.__warningregistry__ - - def check_modules(self, module_name, module_fqn, module_path): - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter('always') - # imp.load_module will load even if it has already been loaded. - # We need to ensure that happens in order to trigger the - # deprecation warnings - importlib.find_loader(module_fqn, module_path).load_module() - warning_raised = False - for warning in (e.message for e in w): - if isinstance(warning, PendingDeprecationWarning) and \ - ('%s is deprecated' % module_name) in warning.args[0]: - warning_raised = True - break - tools.assert_true(warning_raised, msg='%s did not raise a PendingDeprecationWarning' % module_fqn) - - def test_modules(self): - for mod in self.module_data: - yield self.check_modules, mod[0], mod[1], mod[2] - - def test_defaultdict(self): - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter('always') - defaultdict() - warning_raised = False - for warning in (e.message for e in w): - if isinstance(warning, PendingDeprecationWarning) and \ - ('defaultdict is deprecated') in warning.args[0]: - warning_raised = True - break - tools.assert_true(warning_raised, msg='kitchen.pycompat25.collections.defaultdict did not raise a PendingDeprecationWarning') diff --git a/kitchen3/tests/test_pycompat.py b/kitchen3/tests/test_pycompat.py deleted file mode 100644 index 50a059b..0000000 --- a/kitchen3/tests/test_pycompat.py +++ /dev/null @@ -1,25 +0,0 @@ -# -*- coding: utf-8 -*- -# -import unittest -from nose import tools - -class TestUsableModules(unittest.TestCase): - def test_subprocess(self): - '''Test that importing subprocess as a module works - ''' - try: - from kitchen.pycompat24.subprocess import Popen - except ImportError: - tools.ok_(False, 'Unable to import pycompat24.subprocess as a module') - try: - from kitchen.pycompat27.subprocess import Popen - except ImportError: - tools.ok_(False, 'Unable to import pycompat27.subprocess as a module') - - def test_base64(self): - '''Test that importing base64 as a module works - ''' - try: - from kitchen.pycompat24.base64 import b64encode - except ImportError: - tools.ok_(False, 'Unable to import pycompat24.base64 as a module') diff --git a/setup.py b/setup.py index 4813cae..e1a5c48 100755 --- a/setup.py +++ b/setup.py @@ -15,13 +15,6 @@ if sys.version_info[0] == 2: 'kitchen.iterutils', 'kitchen.collections', 'kitchen.text', - 'kitchen.pycompat24', - 'kitchen.pycompat24.base64', - 'kitchen.pycompat24.sets', - 'kitchen.pycompat25', - 'kitchen.pycompat25.collections', - 'kitchen.pycompat27', - 'kitchen.pycompat27.subprocess', ] elif sys.version_info[0] == 3: source_dir = 'kitchen3' @@ -32,13 +25,6 @@ elif sys.version_info[0] == 3: 'kitchen.iterutils', 'kitchen.collections', 'kitchen.text', - 'kitchen.pycompat24', - 'kitchen.pycompat24.base64', - 'kitchen.pycompat24.sets', - 'kitchen.pycompat25', - 'kitchen.pycompat25.collections', - 'kitchen.pycompat27', - 'kitchen.pycompat27.subprocess', ] else: raise NotImplementedError("Python version unsupported %r" % sys.version)