kitchen/debian/patches/remove_compat_layers

2523 lines
99 KiB
Plaintext
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

From 727e0eb7f33edfca9f1d18997fb2766a2dd3c9fd Mon Sep 17 00:00:00 2001
From: Simon Chopin <chopin.simon@gmail.com>
Date: Thu, 8 Oct 2015 09:26:23 -0700
Subject: Remove the pycompat* submodules
Those are not needed in Debian as we already ship the latest runtime version.
Forwarded: not-needed
Last-Update: 2013-04-30
Patch-Name: remove_compat_layers
---
kitchen2/docs/api-overview.rst | 3 -
kitchen2/docs/api-pycompat24.rst | 34 -
kitchen2/docs/api-pycompat25.rst | 8 -
kitchen2/docs/api-pycompat27.rst | 35 -
kitchen2/kitchen/text/converters.py | 3 -
kitchen2/kitchen/text/misc.py | 3 -
kitchen2/tests/subprocessdata/sigchild_ignore.py | 11 -
kitchen2/tests/test__all__.py | 2 -
kitchen2/tests/test_base64.py | 190 ---
kitchen2/tests/test_collections.py | 3 -
kitchen2/tests/test_defaultdict.py | 180 ---
kitchen2/tests/test_pycompat.py | 25 -
kitchen2/tests/test_pycompat24.py | 109 --
kitchen2/tests/test_subprocess.py | 1467 ----------------------
kitchen3/docs/api-overview.rst | 3 -
kitchen3/docs/api-pycompat24.rst | 34 -
kitchen3/docs/api-pycompat25.rst | 8 -
kitchen3/docs/api-pycompat27.rst | 35 -
kitchen3/tests/subprocessdata/sigchild_ignore.py | 11 -
kitchen3/tests/test__all__.py | 2 -
kitchen3/tests/test_collections.py | 3 -
kitchen3/tests/test_deprecation_py3.py | 65 -
kitchen3/tests/test_pycompat.py | 25 -
setup.py | 14 -
24 files changed, 2273 deletions(-)
delete mode 100644 kitchen2/docs/api-pycompat24.rst
delete mode 100644 kitchen2/docs/api-pycompat25.rst
delete mode 100644 kitchen2/docs/api-pycompat27.rst
delete mode 100644 kitchen2/tests/subprocessdata/sigchild_ignore.py
delete mode 100644 kitchen2/tests/test_base64.py
delete mode 100644 kitchen2/tests/test_defaultdict.py
delete mode 100644 kitchen2/tests/test_pycompat.py
delete mode 100644 kitchen2/tests/test_pycompat24.py
delete mode 100644 kitchen2/tests/test_subprocess.py
delete mode 100644 kitchen3/docs/api-pycompat24.rst
delete mode 100644 kitchen3/docs/api-pycompat25.rst
delete mode 100644 kitchen3/docs/api-pycompat27.rst
delete mode 100644 kitchen3/tests/subprocessdata/sigchild_ignore.py
delete mode 100644 kitchen3/tests/test_deprecation_py3.py
delete mode 100644 kitchen3/tests/test_pycompat.py
diff --git a/kitchen2/docs/api-overview.rst b/kitchen2/docs/api-overview.rst
index dda56fe..e53a94d 100644
--- a/kitchen2/docs/api-overview.rst
+++ b/kitchen2/docs/api-overview.rst
@@ -16,9 +16,6 @@ that may drag in more dependencies can be found on the `project webpage`_
api-collections
api-iterutils
api-versioning
- api-pycompat24
- api-pycompat25
- api-pycompat27
api-exceptions
.. _`project webpage`: https://fedorahosted.org/kitchen
diff --git a/kitchen2/docs/api-pycompat24.rst b/kitchen2/docs/api-pycompat24.rst
deleted file mode 100644
index a3247b6..0000000
--- a/kitchen2/docs/api-pycompat24.rst
+++ /dev/null
@@ -1,34 +0,0 @@
-=======================
-Python 2.4 Compatibiity
-=======================
-
-
--------------------
-Sets for python-2.3
--------------------
-
-.. automodule:: kitchen.pycompat24.sets
-.. autofunction:: kitchen.pycompat24.sets.add_builtin_set
-
-----------------------------------
-Partial new style base64 interface
-----------------------------------
-
-.. automodule:: kitchen.pycompat24.base64
- :members:
-
-----------
-Subprocess
-----------
-
-.. seealso::
-
- :mod:`kitchen.pycompat27.subprocess`
- Kitchen includes the python-2.7 version of subprocess which has a new
- function, :func:`~kitchen.pycompat27.subprocess.check_output`. When
- you import :mod:`pycompat24.subprocess` you will be getting the
- python-2.7 version of subprocess rather than the 2.4 version (where
- subprocess first appeared). This choice was made so that we can
- concentrate our efforts on keeping the single version of subprocess up
- to date rather than working on a 2.4 version that very few people
- would need specifically.
diff --git a/kitchen2/docs/api-pycompat25.rst b/kitchen2/docs/api-pycompat25.rst
deleted file mode 100644
index 1841c6a..0000000
--- a/kitchen2/docs/api-pycompat25.rst
+++ /dev/null
@@ -1,8 +0,0 @@
-========================
-Python 2.5 Compatibility
-========================
-
-.. automodule:: kitchen.pycompat25
-
-.. automodule:: kitchen.pycompat25.collections._defaultdict
-
diff --git a/kitchen2/docs/api-pycompat27.rst b/kitchen2/docs/api-pycompat27.rst
deleted file mode 100644
index 9654b31..0000000
--- a/kitchen2/docs/api-pycompat27.rst
+++ /dev/null
@@ -1,35 +0,0 @@
-========================
-Python 2.7 Compatibility
-========================
-
-.. module:: kitchen.pycompat27.subprocess
-
---------------------------
-Subprocess from Python 2.7
---------------------------
-
-The :mod:`subprocess` module included here is a direct import from
-python-2.7's |stdlib|_. You can access it via::
-
- >>> from kitchen.pycompat27 import subprocess
-
-The motivation for including this module is that various API changing
-improvements have been made to subprocess over time. The following is a list
-of the known changes to :mod:`subprocess` with the python version they were
-introduced in:
-
-==================================== ===
-New API Feature Ver
-==================================== ===
-:exc:`subprocess.CalledProcessError` 2.5
-:func:`subprocess.check_call` 2.5
-:func:`subprocess.check_output` 2.7
-:meth:`subprocess.Popen.send_signal` 2.6
-:meth:`subprocess.Popen.terminate` 2.6
-:meth:`subprocess.Popen.kill` 2.6
-==================================== ===
-
-.. seealso::
-
- The stdlib :mod:`subprocess` documentation
- For complete documentation on how to use subprocess
diff --git a/kitchen2/kitchen/text/converters.py b/kitchen2/kitchen/text/converters.py
index e4bcb0d..eb518cb 100644
--- a/kitchen2/kitchen/text/converters.py
+++ b/kitchen2/kitchen/text/converters.py
@@ -50,9 +50,6 @@ import codecs
import warnings
import xml.sax.saxutils
-from kitchen.pycompat24 import sets
-sets.add_builtin_set()
-
from kitchen.text.exceptions import ControlCharError, XmlEncodeError
from kitchen.text.misc import guess_encoding, html_entities_unescape, \
isbytestring, isunicodestring, process_control_chars
diff --git a/kitchen2/kitchen/text/misc.py b/kitchen2/kitchen/text/misc.py
index 333e363..babb68e 100644
--- a/kitchen2/kitchen/text/misc.py
+++ b/kitchen2/kitchen/text/misc.py
@@ -43,11 +43,8 @@ try:
except ImportError:
chardet = None
-from kitchen.pycompat24 import sets
from kitchen.text.exceptions import ControlCharError
-sets.add_builtin_set()
-
# Define a threshold for chardet confidence. If we fall below this we decode
# byte strings we're guessing about as latin1
_CHARDET_THRESHHOLD = 0.6
diff --git a/kitchen2/tests/subprocessdata/sigchild_ignore.py b/kitchen2/tests/subprocessdata/sigchild_ignore.py
deleted file mode 100644
index 5b6dd08..0000000
--- a/kitchen2/tests/subprocessdata/sigchild_ignore.py
+++ /dev/null
@@ -1,11 +0,0 @@
-import os
-import signal, sys
-sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
-
-from kitchen.pycompat27.subprocess import _subprocess as subprocess
-
-# On Linux this causes os.waitpid to fail with OSError as the OS has already
-# reaped our child process. The wait() passing the OSError on to the caller
-# and causing us to exit with an error is what we are testing against.
-signal.signal(signal.SIGCHLD, signal.SIG_IGN)
-subprocess.Popen([sys.executable, '-c', 'print("albatross")']).wait()
diff --git a/kitchen2/tests/test__all__.py b/kitchen2/tests/test__all__.py
index dda287a..e08d699 100644
--- a/kitchen2/tests/test__all__.py
+++ b/kitchen2/tests/test__all__.py
@@ -4,8 +4,6 @@ from nose import tools
import os
import types
import warnings
-from kitchen.pycompat24.sets import add_builtin_set
-add_builtin_set()
def logit(msg):
log = open('/var/tmp/test.log', 'a')
diff --git a/kitchen2/tests/test_base64.py b/kitchen2/tests/test_base64.py
deleted file mode 100644
index 49e1cd1..0000000
--- a/kitchen2/tests/test_base64.py
+++ /dev/null
@@ -1,190 +0,0 @@
-import unittest
-from kitchen.pycompat24.base64 import _base64 as base64
-
-
-
-class LegacyBase64TestCase(unittest.TestCase):
- def test_encodestring(self):
- eq = self.assertEqual
- eq(base64.encodestring("www.python.org"), "d3d3LnB5dGhvbi5vcmc=\n")
- eq(base64.encodestring("a"), "YQ==\n")
- eq(base64.encodestring("ab"), "YWI=\n")
- eq(base64.encodestring("abc"), "YWJj\n")
- eq(base64.encodestring(""), "")
- eq(base64.encodestring("abcdefghijklmnopqrstuvwxyz"
- "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- "0123456789!@#0^&*();:<>,. []{}"),
- "YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE"
- "RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0\nNT"
- "Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ==\n")
-
- def test_decodestring(self):
- eq = self.assertEqual
- eq(base64.decodestring("d3d3LnB5dGhvbi5vcmc=\n"), "www.python.org")
- eq(base64.decodestring("YQ==\n"), "a")
- eq(base64.decodestring("YWI=\n"), "ab")
- eq(base64.decodestring("YWJj\n"), "abc")
- eq(base64.decodestring("YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE"
- "RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0\nNT"
- "Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ==\n"),
- "abcdefghijklmnopqrstuvwxyz"
- "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- "0123456789!@#0^&*();:<>,. []{}")
- eq(base64.decodestring(''), '')
-
- def test_encode(self):
- eq = self.assertEqual
- from cStringIO import StringIO
- infp = StringIO('abcdefghijklmnopqrstuvwxyz'
- 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
- '0123456789!@#0^&*();:<>,. []{}')
- outfp = StringIO()
- base64.encode(infp, outfp)
- eq(outfp.getvalue(),
- 'YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE'
- 'RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0\nNT'
- 'Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ==\n')
-
- def test_decode(self):
- from cStringIO import StringIO
- infp = StringIO('d3d3LnB5dGhvbi5vcmc=')
- outfp = StringIO()
- base64.decode(infp, outfp)
- self.assertEqual(outfp.getvalue(), 'www.python.org')
-
-
-
-class BaseXYTestCase(unittest.TestCase):
- def test_b64encode(self):
- eq = self.assertEqual
- # Test default alphabet
- eq(base64.b64encode("www.python.org"), "d3d3LnB5dGhvbi5vcmc=")
- eq(base64.b64encode('\x00'), 'AA==')
- eq(base64.b64encode("a"), "YQ==")
- eq(base64.b64encode("ab"), "YWI=")
- eq(base64.b64encode("abc"), "YWJj")
- eq(base64.b64encode(""), "")
- eq(base64.b64encode("abcdefghijklmnopqrstuvwxyz"
- "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- "0123456789!@#0^&*();:<>,. []{}"),
- "YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE"
- "RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0NT"
- "Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ==")
- # Test with arbitrary alternative characters
- eq(base64.b64encode('\xd3V\xbeo\xf7\x1d', altchars='*$'), '01a*b$cd')
- # Test standard alphabet
- eq(base64.standard_b64encode("www.python.org"), "d3d3LnB5dGhvbi5vcmc=")
- eq(base64.standard_b64encode("a"), "YQ==")
- eq(base64.standard_b64encode("ab"), "YWI=")
- eq(base64.standard_b64encode("abc"), "YWJj")
- eq(base64.standard_b64encode(""), "")
- eq(base64.standard_b64encode("abcdefghijklmnopqrstuvwxyz"
- "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- "0123456789!@#0^&*();:<>,. []{}"),
- "YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE"
- "RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0NT"
- "Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ==")
- # Test with 'URL safe' alternative characters
- eq(base64.urlsafe_b64encode('\xd3V\xbeo\xf7\x1d'), '01a-b_cd')
-
- def test_b64decode(self):
- eq = self.assertEqual
- eq(base64.b64decode("d3d3LnB5dGhvbi5vcmc="), "www.python.org")
- eq(base64.b64decode('AA=='), '\x00')
- eq(base64.b64decode("YQ=="), "a")
- eq(base64.b64decode("YWI="), "ab")
- eq(base64.b64decode("YWJj"), "abc")
- eq(base64.b64decode("YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE"
- "RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0\nNT"
- "Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ=="),
- "abcdefghijklmnopqrstuvwxyz"
- "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- "0123456789!@#0^&*();:<>,. []{}")
- eq(base64.b64decode(''), '')
- # Test with arbitrary alternative characters
- eq(base64.b64decode('01a*b$cd', altchars='*$'), '\xd3V\xbeo\xf7\x1d')
- # Test standard alphabet
- eq(base64.standard_b64decode("d3d3LnB5dGhvbi5vcmc="), "www.python.org")
- eq(base64.standard_b64decode("YQ=="), "a")
- eq(base64.standard_b64decode("YWI="), "ab")
- eq(base64.standard_b64decode("YWJj"), "abc")
- eq(base64.standard_b64decode(""), "")
- eq(base64.standard_b64decode("YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE"
- "RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0NT"
- "Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ=="),
- "abcdefghijklmnopqrstuvwxyz"
- "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- "0123456789!@#0^&*();:<>,. []{}")
- # Test with 'URL safe' alternative characters
- eq(base64.urlsafe_b64decode('01a-b_cd'), '\xd3V\xbeo\xf7\x1d')
-
- def test_b64decode_error(self):
- self.assertRaises(TypeError, base64.b64decode, 'abc')
-
- def test_b32encode(self):
- eq = self.assertEqual
- eq(base64.b32encode(''), '')
- eq(base64.b32encode('\x00'), 'AA======')
- eq(base64.b32encode('a'), 'ME======')
- eq(base64.b32encode('ab'), 'MFRA====')
- eq(base64.b32encode('abc'), 'MFRGG===')
- eq(base64.b32encode('abcd'), 'MFRGGZA=')
- eq(base64.b32encode('abcde'), 'MFRGGZDF')
-
- def test_b32decode(self):
- eq = self.assertEqual
- eq(base64.b32decode(''), '')
- eq(base64.b32decode('AA======'), '\x00')
- eq(base64.b32decode('ME======'), 'a')
- eq(base64.b32decode('MFRA===='), 'ab')
- eq(base64.b32decode('MFRGG==='), 'abc')
- eq(base64.b32decode('MFRGGZA='), 'abcd')
- eq(base64.b32decode('MFRGGZDF'), 'abcde')
-
- def test_b32decode_casefold(self):
- eq = self.assertEqual
- eq(base64.b32decode('', True), '')
- eq(base64.b32decode('ME======', True), 'a')
- eq(base64.b32decode('MFRA====', True), 'ab')
- eq(base64.b32decode('MFRGG===', True), 'abc')
- eq(base64.b32decode('MFRGGZA=', True), 'abcd')
- eq(base64.b32decode('MFRGGZDF', True), 'abcde')
- # Lower cases
- eq(base64.b32decode('me======', True), 'a')
- eq(base64.b32decode('mfra====', True), 'ab')
- eq(base64.b32decode('mfrgg===', True), 'abc')
- eq(base64.b32decode('mfrggza=', True), 'abcd')
- eq(base64.b32decode('mfrggzdf', True), 'abcde')
- # Expected exceptions
- self.assertRaises(TypeError, base64.b32decode, 'me======')
- # Mapping zero and one
- eq(base64.b32decode('MLO23456'), 'b\xdd\xad\xf3\xbe')
- eq(base64.b32decode('M1023456', map01='L'), 'b\xdd\xad\xf3\xbe')
- eq(base64.b32decode('M1023456', map01='I'), 'b\x1d\xad\xf3\xbe')
-
- def test_b32decode_error(self):
- self.assertRaises(TypeError, base64.b32decode, 'abc')
- self.assertRaises(TypeError, base64.b32decode, 'ABCDEF==')
-
- def test_b16encode(self):
- eq = self.assertEqual
- eq(base64.b16encode('\x01\x02\xab\xcd\xef'), '0102ABCDEF')
- eq(base64.b16encode('\x00'), '00')
-
- def test_b16decode(self):
- eq = self.assertEqual
- eq(base64.b16decode('0102ABCDEF'), '\x01\x02\xab\xcd\xef')
- eq(base64.b16decode('00'), '\x00')
- # Lower case is not allowed without a flag
- self.assertRaises(TypeError, base64.b16decode, '0102abcdef')
- # Case fold
- eq(base64.b16decode('0102abcdef', True), '\x01\x02\xab\xcd\xef')
-
-
-
-#from test import test_support
-#def test_main():
-# test_support.run_unittest(__name__)
-#
-#if __name__ == '__main__':
-# test_main()
diff --git a/kitchen2/tests/test_collections.py b/kitchen2/tests/test_collections.py
index 5e4dbd4..8765594 100644
--- a/kitchen2/tests/test_collections.py
+++ b/kitchen2/tests/test_collections.py
@@ -3,9 +3,6 @@
import unittest
from nose import tools
-from kitchen.pycompat24.sets import add_builtin_set
-add_builtin_set()
-
from kitchen import collections
def test_strict_dict_get_set():
diff --git a/kitchen2/tests/test_defaultdict.py b/kitchen2/tests/test_defaultdict.py
deleted file mode 100644
index 5848d28..0000000
--- a/kitchen2/tests/test_defaultdict.py
+++ /dev/null
@@ -1,180 +0,0 @@
-"""Unit tests for collections.defaultdict."""
-
-import os
-import copy
-import tempfile
-import unittest
-
-from kitchen.pycompat25.collections._defaultdict import defaultdict
-
-def foobar():
- return list
-
-class TestDefaultDict(unittest.TestCase):
-
- def test_basic(self):
- d1 = defaultdict()
- self.assertEqual(d1.default_factory, None)
- d1.default_factory = list
- d1[12].append(42)
- self.assertEqual(d1, {12: [42]})
- d1[12].append(24)
- self.assertEqual(d1, {12: [42, 24]})
- d1[13]
- d1[14]
- self.assertEqual(d1, {12: [42, 24], 13: [], 14: []})
- self.assert_(d1[12] is not d1[13] is not d1[14])
- d2 = defaultdict(list, foo=1, bar=2)
- self.assertEqual(d2.default_factory, list)
- self.assertEqual(d2, {"foo": 1, "bar": 2})
- self.assertEqual(d2["foo"], 1)
- self.assertEqual(d2["bar"], 2)
- self.assertEqual(d2[42], [])
- self.assert_("foo" in d2)
- self.assert_("foo" in d2.keys())
- self.assert_("bar" in d2)
- self.assert_("bar" in d2.keys())
- self.assert_(42 in d2)
- self.assert_(42 in d2.keys())
- self.assert_(12 not in d2)
- self.assert_(12 not in d2.keys())
- d2.default_factory = None
- self.assertEqual(d2.default_factory, None)
- try:
- d2[15]
- except KeyError, err:
- self.assertEqual(err.args, (15,))
- else:
- self.fail("d2[15] didn't raise KeyError")
- self.assertRaises(TypeError, defaultdict, 1)
-
- def test_missing(self):
- d1 = defaultdict()
- self.assertRaises(KeyError, d1.__missing__, 42)
- d1.default_factory = list
- self.assertEqual(d1.__missing__(42), [])
-
- def test_repr(self):
- d1 = defaultdict()
- self.assertEqual(d1.default_factory, None)
- self.assertEqual(repr(d1), "defaultdict(None, {})")
- self.assertEqual(eval(repr(d1)), d1)
- d1[11] = 41
- self.assertEqual(repr(d1), "defaultdict(None, {11: 41})")
- d2 = defaultdict(int)
- self.assertEqual(d2.default_factory, int)
- d2[12] = 42
- self.assertEqual(repr(d2), "defaultdict(<type 'int'>, {12: 42})")
- def foo(): return 43
- d3 = defaultdict(foo)
-
- self.assert_(d3.default_factory is foo)
- d3[13]
- self.assertEqual(repr(d3), "defaultdict(%s, {13: 43})" % repr(foo))
-
- def test_print(self):
- d1 = defaultdict()
- def foo(): return 42
- d2 = defaultdict(foo, {1: 2})
- # NOTE: We can't use tempfile.[Named]TemporaryFile since this
- # code must exercise the tp_print C code, which only gets
- # invoked for *real* files.
- tfn = tempfile.mktemp()
- try:
- f = open(tfn, "w+")
- try:
- print >>f, d1
- print >>f, d2
- f.seek(0)
- self.assertEqual(f.readline(), repr(d1) + "\n")
- self.assertEqual(f.readline(), repr(d2) + "\n")
- finally:
- f.close()
- finally:
- os.remove(tfn)
-
- def test_copy(self):
- d1 = defaultdict()
- d2 = d1.copy()
- self.assertEqual(type(d2), defaultdict)
- self.assertEqual(d2.default_factory, None)
- self.assertEqual(d2, {})
- d1.default_factory = list
- d3 = d1.copy()
- self.assertEqual(type(d3), defaultdict)
- self.assertEqual(d3.default_factory, list)
- self.assertEqual(d3, {})
- d1[42]
- d4 = d1.copy()
- self.assertEqual(type(d4), defaultdict)
- self.assertEqual(d4.default_factory, list)
- self.assertEqual(d4, {42: []})
- d4[12]
- self.assertEqual(d4, {42: [], 12: []})
-
- # Issue 6637: Copy fails for empty default dict
- d = defaultdict()
- d['a'] = 42
- e = d.copy()
- self.assertEqual(e['a'], 42)
-
- def test_shallow_copy(self):
- d1 = defaultdict(foobar, {1: 1})
- d2 = copy.copy(d1)
- self.assertEqual(d2.default_factory, foobar)
- self.assertEqual(d2, d1)
- d1.default_factory = list
- d2 = copy.copy(d1)
- self.assertEqual(d2.default_factory, list)
- self.assertEqual(d2, d1)
-
- def test_deep_copy(self):
- d1 = defaultdict(foobar, {1: [1]})
- d2 = copy.deepcopy(d1)
- self.assertEqual(d2.default_factory, foobar)
- self.assertEqual(d2, d1)
- self.assert_(d1[1] is not d2[1])
- d1.default_factory = list
- d2 = copy.deepcopy(d1)
- self.assertEqual(d2.default_factory, list)
- self.assertEqual(d2, d1)
-
- def test_keyerror_without_factory(self):
- d1 = defaultdict()
- try:
- d1[(1,)]
- except KeyError, err:
- self.assertEqual(err.args[0], (1,))
- else:
- self.fail("expected KeyError")
-
- def test_recursive_repr(self):
- # Issue2045: stack overflow when default_factory is a bound method
- class sub(defaultdict):
- def __init__(self):
- self.default_factory = self._factory
- def _factory(self):
- return []
- d = sub()
- self.assert_(repr(d).startswith(
- "defaultdict(<bound method sub._factory of defaultdict(..."))
-
- # NOTE: printing a subclass of a builtin type does not call its
- # tp_print slot. So this part is essentially the same test as above.
- tfn = tempfile.mktemp()
- try:
- f = open(tfn, "w+")
- try:
- print >>f, d
- finally:
- f.close()
- finally:
- os.remove(tfn)
-
-
-#from test import test_support
-#def test_main():
-# test_support.run_unittest(TestDefaultDict)
-#
-#if __name__ == "__main__":
-# test_main()
diff --git a/kitchen2/tests/test_pycompat.py b/kitchen2/tests/test_pycompat.py
deleted file mode 100644
index 50a059b..0000000
--- a/kitchen2/tests/test_pycompat.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-import unittest
-from nose import tools
-
-class TestUsableModules(unittest.TestCase):
- def test_subprocess(self):
- '''Test that importing subprocess as a module works
- '''
- try:
- from kitchen.pycompat24.subprocess import Popen
- except ImportError:
- tools.ok_(False, 'Unable to import pycompat24.subprocess as a module')
- try:
- from kitchen.pycompat27.subprocess import Popen
- except ImportError:
- tools.ok_(False, 'Unable to import pycompat27.subprocess as a module')
-
- def test_base64(self):
- '''Test that importing base64 as a module works
- '''
- try:
- from kitchen.pycompat24.base64 import b64encode
- except ImportError:
- tools.ok_(False, 'Unable to import pycompat24.base64 as a module')
diff --git a/kitchen2/tests/test_pycompat24.py b/kitchen2/tests/test_pycompat24.py
deleted file mode 100644
index adea7fe..0000000
--- a/kitchen2/tests/test_pycompat24.py
+++ /dev/null
@@ -1,109 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-import unittest
-from nose import tools
-from nose.plugins.skip import SkipTest
-
-import __builtin__
-import base64 as py_b64
-import warnings
-
-from kitchen.pycompat24 import sets
-from kitchen.pycompat24.base64 import _base64 as base64
-
-class TestSetsNoOverwrite(unittest.TestCase):
- def setUp(self):
- self.set_val = None
- self.frozenset_val = None
- if not hasattr(__builtin__, 'set'):
- __builtin__.set = self.set_val
- else:
- self.set_val = __builtin__.set
- if not hasattr(__builtin__, 'frozenset'):
- __builtin__.frozenset = self.frozenset_val
- else:
- self.frozenset_val = __builtin__.frozenset
-
- def tearDown(self):
- if self.frozenset_val == None:
- del(__builtin__.frozenset)
- if self.set_val == None:
- del(__builtin__.set)
-
- def test_sets_dont_overwrite(self):
- '''Test that importing sets when there's already a set and frozenset defined does not overwrite
- '''
- sets.add_builtin_set()
- tools.ok_(__builtin__.set == self.set_val)
- tools.ok_(__builtin__.frozenset == self.frozenset_val)
-
-class TestDefineSets(unittest.TestCase):
- def setUp(self):
- warnings.simplefilter('ignore', DeprecationWarning)
- self.set_val = None
- self.frozenset_val = None
- if hasattr(__builtin__, 'set'):
- self.set_val = __builtin__.set
- del(__builtin__.set)
- if hasattr(__builtin__, 'frozenset'):
- self.frozenset_val = __builtin__.frozenset
- del(__builtin__.frozenset)
-
- def tearDown(self):
- warnings.simplefilter('default', DeprecationWarning)
- if self.set_val:
- __builtin__.set = self.set_val
- else:
- del(__builtin__.set)
- if self.frozenset_val:
- __builtin__.frozenset = self.frozenset_val
- else:
- del(__builtin__.frozenset)
-
- def test_pycompat_defines_set(self):
- '''Test that calling pycompat24.add_builtin_set() adds set and frozenset to __builtin__
- '''
- import sets as py_sets
- sets.add_builtin_set()
- if self.set_val:
- tools.ok_(__builtin__.set == self.set_val)
- tools.ok_(__builtin__.frozenset == self.frozenset_val)
- else:
- tools.ok_(__builtin__.set == py_sets.Set)
- tools.ok_(__builtin__.frozenset == py_sets.ImmutableSet)
-
-class TestSubprocess(unittest.TestCase):
- pass
-
-class TestBase64(unittest.TestCase):
- b_byte_chars = ' '.join(map(chr, range(0, 256)))
- b_byte_encoded = 'ACABIAIgAyAEIAUgBiAHIAggCSAKIAsgDCANIA4gDyAQIBEgEiATIBQgFSAWIBcgGCAZIBogGyAcIB0gHiAfICAgISAiICMgJCAlICYgJyAoICkgKiArICwgLSAuIC8gMCAxIDIgMyA0IDUgNiA3IDggOSA6IDsgPCA9ID4gPyBAIEEgQiBDIEQgRSBGIEcgSCBJIEogSyBMIE0gTiBPIFAgUSBSIFMgVCBVIFYgVyBYIFkgWiBbIFwgXSBeIF8gYCBhIGIgYyBkIGUgZiBnIGggaSBqIGsgbCBtIG4gbyBwIHEgciBzIHQgdSB2IHcgeCB5IHogeyB8IH0gfiB/IIAggSCCIIMghCCFIIYghyCIIIkgiiCLIIwgjSCOII8gkCCRIJIgkyCUIJUgliCXIJggmSCaIJsgnCCdIJ4gnyCgIKEgoiCjIKQgpSCmIKcgqCCpIKogqyCsIK0griCvILAgsSCyILMgtCC1ILYgtyC4ILkguiC7ILwgvSC+IL8gwCDBIMIgwyDEIMUgxiDHIMggySDKIMsgzCDNIM4gzyDQINEg0iDTINQg1SDWINcg2CDZINog2yDcIN0g3iDfIOAg4SDiIOMg5CDlIOYg5yDoIOkg6iDrIOwg7SDuIO8g8CDxIPIg8yD0IPUg9iD3IPgg+SD6IPsg/CD9IP4g/w=='
- b_byte_encoded_urlsafe = 'ACABIAIgAyAEIAUgBiAHIAggCSAKIAsgDCANIA4gDyAQIBEgEiATIBQgFSAWIBcgGCAZIBogGyAcIB0gHiAfICAgISAiICMgJCAlICYgJyAoICkgKiArICwgLSAuIC8gMCAxIDIgMyA0IDUgNiA3IDggOSA6IDsgPCA9ID4gPyBAIEEgQiBDIEQgRSBGIEcgSCBJIEogSyBMIE0gTiBPIFAgUSBSIFMgVCBVIFYgVyBYIFkgWiBbIFwgXSBeIF8gYCBhIGIgYyBkIGUgZiBnIGggaSBqIGsgbCBtIG4gbyBwIHEgciBzIHQgdSB2IHcgeCB5IHogeyB8IH0gfiB_IIAggSCCIIMghCCFIIYghyCIIIkgiiCLIIwgjSCOII8gkCCRIJIgkyCUIJUgliCXIJggmSCaIJsgnCCdIJ4gnyCgIKEgoiCjIKQgpSCmIKcgqCCpIKogqyCsIK0griCvILAgsSCyILMgtCC1ILYgtyC4ILkguiC7ILwgvSC-IL8gwCDBIMIgwyDEIMUgxiDHIMggySDKIMsgzCDNIM4gzyDQINEg0iDTINQg1SDWINcg2CDZINog2yDcIN0g3iDfIOAg4SDiIOMg5CDlIOYg5yDoIOkg6iDrIOwg7SDuIO8g8CDxIPIg8yD0IPUg9iD3IPgg-SD6IPsg_CD9IP4g_w=='
-
- def test_base64_encode(self):
- tools.ok_(base64.b64encode(self.b_byte_chars) == self.b_byte_encoded)
- tools.ok_(base64.b64encode(self.b_byte_chars, altchars='-_') == self.b_byte_encoded_urlsafe)
- tools.ok_(base64.standard_b64encode(self.b_byte_chars) == self.b_byte_encoded)
- tools.ok_(base64.urlsafe_b64encode(self.b_byte_chars) == self.b_byte_encoded_urlsafe)
-
- tools.ok_(base64.b64encode(self.b_byte_chars) == self.b_byte_encoded)
- tools.ok_(base64.b64encode(self.b_byte_chars, altchars='-_') == self.b_byte_encoded_urlsafe)
- tools.ok_(base64.standard_b64encode(self.b_byte_chars) == self.b_byte_encoded)
- tools.ok_(base64.urlsafe_b64encode(self.b_byte_chars) == self.b_byte_encoded_urlsafe)
-
- def test_base64_decode(self):
- tools.ok_(base64.b64decode(self.b_byte_encoded) == self.b_byte_chars)
- tools.ok_(base64.b64decode(self.b_byte_encoded_urlsafe, altchars='-_') == self.b_byte_chars)
- tools.ok_(base64.standard_b64decode(self.b_byte_encoded) == self.b_byte_chars)
- tools.ok_(base64.urlsafe_b64decode(self.b_byte_encoded_urlsafe) == self.b_byte_chars)
-
- tools.ok_(base64.b64decode(self.b_byte_encoded) == self.b_byte_chars)
- tools.ok_(base64.b64decode(self.b_byte_encoded_urlsafe, altchars='-_') == self.b_byte_chars)
- tools.ok_(base64.standard_b64decode(self.b_byte_encoded) == self.b_byte_chars)
- tools.ok_(base64.urlsafe_b64decode(self.b_byte_encoded_urlsafe) == self.b_byte_chars)
-
- def test_base64_stdlib_compat(self):
- if not hasattr(py_b64, 'b64encode'):
- raise SkipTest('Python-2.3 doesn\'t have b64encode to compare against')
- tools.ok_(base64.b64encode(self.b_byte_chars) == py_b64.b64encode(self.b_byte_chars))
- tools.ok_(base64.b64decode(self.b_byte_chars) == py_b64.b64decode(self.b_byte_chars))
diff --git a/kitchen2/tests/test_subprocess.py b/kitchen2/tests/test_subprocess.py
deleted file mode 100644
index 010f660..0000000
--- a/kitchen2/tests/test_subprocess.py
+++ /dev/null
@@ -1,1467 +0,0 @@
-import unittest
-from nose.plugins.skip import SkipTest
-from kitchen.pycompat27.subprocess import _subprocess as subprocess
-import sys
-import StringIO
-import signal
-import os
-import errno
-import tempfile
-import time
-import re
-# Not available on python2.6 or less
-#import sysconfig
-
-mswindows = (sys.platform == "win32")
-
-#
-# Depends on the following external programs: Python
-#
-
-if mswindows:
- SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
- 'os.O_BINARY);')
-else:
- SETBINARY = ''
-
-def reap_children():
- """Use this function at the end of test_main() whenever sub-processes
- are started. This will help ensure that no extra children (zombies)
- stick around to hog resources and create problems when looking
- for refleaks.
- """
-
- # Reap all our dead child processes so we don't leave zombies around.
- # These hog resources and might be causing some of the buildbots to die.
- if hasattr(os, 'waitpid'):
- any_process = -1
- while True:
- try:
- # This will raise an exception on Windows. That's ok.
- pid, status = os.waitpid(any_process, os.WNOHANG)
- if pid == 0:
- break
- except:
- break
-
-test_support = None
-try:
- from test import test_support
- if not hasattr(test_support, 'reap_children'):
- # No reap_children in python-2.3
- test_support.reap_children = reap_children
-except ImportError:
- pass
-
-# In a debug build, stuff like "[6580 refs]" is printed to stderr at
-# shutdown time. That frustrates tests trying to check stderr produced
-# from a spawned Python process.
-def remove_stderr_debug_decorations(stderr):
- return re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
-
-try:
- mkstemp = tempfile.mkstemp
-except AttributeError:
- # tempfile.mkstemp is not available
- def mkstemp():
- """Replacement for mkstemp, calling mktemp."""
- fname = tempfile.mktemp()
- return os.open(fname, os.O_RDWR|os.O_CREAT), fname
-
-
-class BaseTestCase(unittest.TestCase):
- def __init__(self, *args, **kwargs):
- unittest.TestCase.__init__(self, *args, **kwargs)
- if not hasattr(self, '_cleanups'):
- self._cleanups = []
- if not hasattr(self, 'addCleanup'):
- self.addCleanup = self._addCleanup
-
- def _addCleanup(self, function, *args, **kwargs):
- self._cleanups.append((function, args, kwargs))
-
- def setUp(self):
- # Try to minimize the number of children we have so this test
- # doesn't crash on some buildbots (Alphas in particular).
- if test_support:
- test_support.reap_children()
-
- def tearDown(self):
- for inst in subprocess._active:
- inst.wait()
- subprocess._cleanup()
- # assertFalse is not available in python-2.3
- self.failIf(subprocess._active, "subprocess._active not empty")
-
- if not hasattr(self, 'doCleanups'):
- ok = True
- while self._cleanups:
- function, args, kwargs = self._cleanups.pop(-1)
- try:
- function(*args, **kwargs)
- except Exception:
- ok = False
-
- def assertStderrEqual(self, stderr, expected, msg=None):
- # In a debug build, stuff like "[6580 refs]" is printed to stderr at
- # shutdown time. That frustrates tests trying to check stderr produced
- # from a spawned Python process.
- actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
- self.assertEqual(actual, expected, msg)
-
-
-class ProcessTestCase(BaseTestCase):
-
- def test_call_seq(self):
- # call() function with sequence argument
- rc = subprocess.call([sys.executable, "-c",
- "import sys; sys.exit(47)"])
- self.assertEqual(rc, 47)
-
- def test_check_call_zero(self):
- # check_call() function with zero return code
- rc = subprocess.check_call([sys.executable, "-c",
- "import sys; sys.exit(0)"])
- self.assertEqual(rc, 0)
-
- def test_check_call_nonzero(self):
- # check_call() function with non-zero return code
- #with self.assertRaises(subprocess.CalledProcessError) as c:
- try:
- subprocess.check_call([sys.executable, "-c",
- "import sys; sys.exit(47)"])
- #self.assertEqual(c.exception.returncode, 47)
- except subprocess.CalledProcessError, e:
- self.assertEqual(e.returncode, 47)
- else:
- self.fail("Expected CalledProcessError")
-
- def test_check_output(self):
- # check_output() function with zero return code
- output = subprocess.check_output(
- [sys.executable, "-c", "print 'BDFL'"])
- #self.assertIn('BDFL', output)
- self.assert_('BDFL' in output)
-
- def test_check_output_nonzero(self):
- # check_call() function with non-zero return code
- #with self.assertRaises(subprocess.CalledProcessError) as c:
- try:
- subprocess.check_output(
- [sys.executable, "-c", "import sys; sys.exit(5)"])
- #self.assertEqual(c.exception.returncode, 5)
- except subprocess.CalledProcessError, e:
- self.assertEqual(e.returncode, 5)
- else:
- self.fail("Expected CalledProcessError")
-
- def test_check_output_stderr(self):
- # check_output() function stderr redirected to stdout
- output = subprocess.check_output(
- [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
- stderr=subprocess.STDOUT)
- #self.assertIn('BDFL', output)
- self.assert_('BDFL' in output)
-
- def test_check_output_stdout_arg(self):
- # check_output() function stderr redirected to stdout
- #with self.assertRaises(ValueError) as c:
- try:
- output = subprocess.check_output(
- [sys.executable, "-c", "print 'will not be run'"],
- stdout=sys.stdout)
- #self.fail("Expected ValueError when stdout arg supplied.")
- #self.assertIn('stdout', c.exception.args[0])
- except ValueError, e:
- #self.assertIn('stdout', e.args[0])
- self.assert_('stdout' in e.args[0])
- else:
- self.fail("Expected ValueError when stdout arg supplied.")
-
- def test_call_kwargs(self):
- # call() function with keyword args
- newenv = os.environ.copy()
- newenv["FRUIT"] = "banana"
- rc = subprocess.call([sys.executable, "-c",
- 'import sys, os;'
- 'sys.exit(os.getenv("FRUIT")=="banana")'],
- env=newenv)
- self.assertEqual(rc, 1)
-
- def test_invalid_args(self):
- # Popen() called with invalid arguments should raise TypeError
- # but Popen.__del__ should not complain (issue #12085)
- #with test_support.captured_stderr() as s:
- orig_stderr = getattr(sys, 'stderr')
- setattr(sys, 'stderr', StringIO.StringIO())
- s = sys.stderr
- try:
- try:
- subprocess.Popen(invalid_arg_name=1)
- except TypeError:
- pass
- except:
- self.fail("Expected TypeError")
- else:
- self.fail("Expected TypeError")
- argcount = subprocess.Popen.__init__.func_code.co_argcount
- too_many_args = [0] * (argcount + 1)
- try:
- subprocess.Popen(*too_many_args)
- except TypeError:
- pass
- except:
- self.fail("Expected TypeError")
- else:
- self.fail("Expected TypeError")
- finally:
- setattr(sys,'stderr', orig_stderr)
- self.assertEqual(s.getvalue(), '')
-
- def test_stdin_none(self):
- # .stdin is None when not redirected
- p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- p.wait()
- self.assertEqual(p.stdin, None)
-
- def test_stdout_none(self):
- # .stdout is None when not redirected
- p = subprocess.Popen([sys.executable, "-c",
- 'print " this bit of output is from a '
- 'test of stdout in a different '
- 'process ..."'],
- stdin=subprocess.PIPE, stderr=subprocess.PIPE)
- self.addCleanup(p.stdin.close)
- self.addCleanup(p.stderr.close)
- p.wait()
- self.assertEqual(p.stdout, None)
-
- def test_stderr_none(self):
- # .stderr is None when not redirected
- p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stdin.close)
- p.wait()
- self.assertEqual(p.stderr, None)
-
- def test_executable_with_cwd(self):
- python_dir = os.path.dirname(os.path.realpath(sys.executable))
- p = subprocess.Popen(["somethingyoudonthave", "-c",
- "import sys; sys.exit(47)"],
- executable=sys.executable, cwd=python_dir)
- p.wait()
- self.assertEqual(p.returncode, 47)
-
- # Not available on python2.3 and we know we're not building python itself
- #@unittest.skipIf(sysconfig.is_python_build(),
- # "need an installed Python. See #7774")
- def test_executable_without_cwd(self):
- # For a normal installation, it should work without 'cwd'
- # argument. For test runs in the build directory, see #7774.
- p = subprocess.Popen(["somethingyoudonthave", "-c",
- "import sys; sys.exit(47)"],
- executable=sys.executable)
- p.wait()
- self.assertEqual(p.returncode, 47)
-
- def test_stdin_pipe(self):
- # stdin redirection
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys; sys.exit(sys.stdin.read() == "pear")'],
- stdin=subprocess.PIPE)
- p.stdin.write("pear")
- p.stdin.close()
- p.wait()
- self.assertEqual(p.returncode, 1)
-
- def test_stdin_filedes(self):
- # stdin is set to open file descriptor
- tf = tempfile.TemporaryFile()
- d = tf.fileno()
- os.write(d, "pear")
- os.lseek(d, 0, 0)
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys; sys.exit(sys.stdin.read() == "pear")'],
- stdin=d)
- p.wait()
- self.assertEqual(p.returncode, 1)
-
- def test_stdin_fileobj(self):
- # stdin is set to open file object
- tf = tempfile.TemporaryFile()
- tf.write("pear")
- tf.seek(0)
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys; sys.exit(sys.stdin.read() == "pear")'],
- stdin=tf)
- p.wait()
- self.assertEqual(p.returncode, 1)
-
- def test_stdout_pipe(self):
- # stdout redirection
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys; sys.stdout.write("orange")'],
- stdout=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.read(), "orange")
-
- def test_stdout_filedes(self):
- # stdout is set to open file descriptor
- tf = tempfile.TemporaryFile()
- d = tf.fileno()
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys; sys.stdout.write("orange")'],
- stdout=d)
- p.wait()
- os.lseek(d, 0, 0)
- self.assertEqual(os.read(d, 1024), "orange")
-
- def test_stdout_fileobj(self):
- # stdout is set to open file object
- tf = tempfile.TemporaryFile()
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys; sys.stdout.write("orange")'],
- stdout=tf)
- p.wait()
- tf.seek(0)
- self.assertEqual(tf.read(), "orange")
-
- def test_stderr_pipe(self):
- # stderr redirection
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys; sys.stderr.write("strawberry")'],
- stderr=subprocess.PIPE)
- self.addCleanup(p.stderr.close)
- #self.assertStderrEqual(p.stderr.read(), "strawberry")
- self.assertEqual(remove_stderr_debug_decorations(p.stderr.read()),
- "strawberry")
-
- def test_stderr_filedes(self):
- # stderr is set to open file descriptor
- tf = tempfile.TemporaryFile()
- d = tf.fileno()
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys; sys.stderr.write("strawberry")'],
- stderr=d)
- p.wait()
- os.lseek(d, 0, 0)
- #self.assertStderrEqual(os.read(d, 1024), "strawberry")
- self.assertEqual(remove_stderr_debug_decorations(os.read(d, 1024)),
- "strawberry")
-
- def test_stderr_fileobj(self):
- # stderr is set to open file object
- tf = tempfile.TemporaryFile()
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys; sys.stderr.write("strawberry")'],
- stderr=tf)
- p.wait()
- tf.seek(0)
- #self.assertStderrEqual(tf.read(), "strawberry")
- self.assertEqual(remove_stderr_debug_decorations(tf.read()),
- "strawberry")
-
- def test_stdout_stderr_pipe(self):
- # capture stdout and stderr to the same pipe
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys;'
- 'sys.stdout.write("apple");'
- 'sys.stdout.flush();'
- 'sys.stderr.write("orange")'],
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- self.addCleanup(p.stdout.close)
- #self.assertStderrEqual(p.stdout.read(), "appleorange")
- output = p.stdout.read()
- stripped = remove_stderr_debug_decorations(output)
- self.assertEqual(stripped, "appleorange")
-
- def test_stdout_stderr_file(self):
- # capture stdout and stderr to the same open file
- tf = tempfile.TemporaryFile()
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys;'
- 'sys.stdout.write("apple");'
- 'sys.stdout.flush();'
- 'sys.stderr.write("orange")'],
- stdout=tf,
- stderr=tf)
- p.wait()
- tf.seek(0)
- #self.assertStderrEqual(tf.read(), "appleorange")
- output = tf.read()
- stripped = remove_stderr_debug_decorations(output)
- self.assertEqual(stripped, "appleorange")
-
- def test_stdout_filedes_of_stdout(self):
- # stdout is set to 1 (#1531862).
- cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))"
- rc = subprocess.call([sys.executable, "-c", cmd], stdout=1)
- self.assertEqual(rc, 2)
-
- def test_cwd(self):
- tmpdir = tempfile.gettempdir()
- # We cannot use os.path.realpath to canonicalize the path,
- # since it doesn't expand Tru64 {memb} strings. See bug 1063571.
- cwd = os.getcwd()
- os.chdir(tmpdir)
- tmpdir = os.getcwd()
- os.chdir(cwd)
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;'
- 'sys.stdout.write(os.getcwd())'],
- stdout=subprocess.PIPE,
- cwd=tmpdir)
- self.addCleanup(p.stdout.close)
- normcase = os.path.normcase
- self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
-
- def test_env(self):
- newenv = os.environ.copy()
- newenv["FRUIT"] = "orange"
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;'
- 'sys.stdout.write(os.getenv("FRUIT"))'],
- stdout=subprocess.PIPE,
- env=newenv)
- self.assertEqual(p.stdout.read(), "orange")
-
- def test_communicate_stdin(self):
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys;'
- 'sys.exit(sys.stdin.read() == "pear")'],
- stdin=subprocess.PIPE)
- p.communicate("pear")
- self.assertEqual(p.returncode, 1)
-
- def test_communicate_stdout(self):
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys; sys.stdout.write("pineapple")'],
- stdout=subprocess.PIPE)
- (stdout, stderr) = p.communicate()
- self.assertEqual(stdout, "pineapple")
- self.assertEqual(stderr, None)
-
- def test_communicate_stderr(self):
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys; sys.stderr.write("pineapple")'],
- stderr=subprocess.PIPE)
- (stdout, stderr) = p.communicate()
- self.assertEqual(stdout, None)
- #self.assertStderrEqual(stderr, "pineapple")
- self.assertEqual(remove_stderr_debug_decorations(stderr), "pineapple")
-
- def test_communicate(self):
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;'
- 'sys.stderr.write("pineapple");'
- 'sys.stdout.write(sys.stdin.read())'],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- self.addCleanup(p.stdin.close)
- (stdout, stderr) = p.communicate("banana")
- self.assertEqual(stdout, "banana")
- #self.assertStderrEqual(stderr, "pineapple")
- self.assertEqual(remove_stderr_debug_decorations(stderr),
- "pineapple")
-
- # Not available with python-2.6's unittest: Reimplement by
- # raising SkipTest from nose
- # This test is Linux specific for simplicity to at least have
- # some coverage. It is not a platform specific bug.
- #@unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
- # "Linux specific")
- # Test for the fd leak reported in http://bugs.python.org/issue2791.
- def test_communicate_pipe_fd_leak(self):
- if not os.path.isdir('/proc/%d/fd' % os.getpid()):
- raise SkipTest('Linux specific')
- fd_directory = '/proc/%d/fd' % os.getpid()
- num_fds_before_popen = len(os.listdir(fd_directory))
- p = subprocess.Popen([sys.executable, "-c", "print()"],
- stdout=subprocess.PIPE)
- p.communicate()
- num_fds_after_communicate = len(os.listdir(fd_directory))
- del p
- num_fds_after_destruction = len(os.listdir(fd_directory))
- self.assertEqual(num_fds_before_popen, num_fds_after_destruction)
- self.assertEqual(num_fds_before_popen, num_fds_after_communicate)
-
- def test_communicate_returns(self):
- # communicate() should return None if no redirection is active
- p = subprocess.Popen([sys.executable, "-c",
- "import sys; sys.exit(47)"])
- (stdout, stderr) = p.communicate()
- self.assertEqual(stdout, None)
- self.assertEqual(stderr, None)
-
- def test_communicate_pipe_buf(self):
- # communicate() with writes larger than pipe_buf
- # This test will probably deadlock rather than fail, if
- # communicate() does not work properly.
- x, y = os.pipe()
- if mswindows:
- pipe_buf = 512
- else:
- pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
- os.close(x)
- os.close(y)
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;'
- 'sys.stdout.write(sys.stdin.read(47));'
- 'sys.stderr.write("xyz"*%d);'
- 'sys.stdout.write(sys.stdin.read())' % pipe_buf],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- self.addCleanup(p.stdin.close)
- string_to_write = "abc"*pipe_buf
- (stdout, stderr) = p.communicate(string_to_write)
- self.assertEqual(stdout, string_to_write)
-
- def test_writes_before_communicate(self):
- # stdin.write before communicate()
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;'
- 'sys.stdout.write(sys.stdin.read())'],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- self.addCleanup(p.stdin.close)
- p.stdin.write("banana")
- (stdout, stderr) = p.communicate("split")
- self.assertEqual(stdout, "bananasplit")
- #self.assertStderrEqual(stderr, "")
- self.assertEqual(remove_stderr_debug_decorations(stderr), "")
-
- def test_universal_newlines(self):
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;' + SETBINARY +
- 'sys.stdout.write("line1\\n");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line2\\r");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line3\\r\\n");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line4\\r");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("\\nline5");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("\\nline6");'],
- stdout=subprocess.PIPE,
- universal_newlines=1)
- self.addCleanup(p.stdout.close)
- stdout = p.stdout.read()
- if hasattr(file, 'newlines'):
- # Interpreter with universal newline support
- self.assertEqual(stdout,
- "line1\nline2\nline3\nline4\nline5\nline6")
- else:
- # Interpreter without universal newline support
- self.assertEqual(stdout,
- "line1\nline2\rline3\r\nline4\r\nline5\nline6")
-
- def test_universal_newlines_communicate(self):
- # universal newlines through communicate()
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys,os;' + SETBINARY +
- 'sys.stdout.write("line1\\n");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line2\\r");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line3\\r\\n");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line4\\r");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("\\nline5");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("\\nline6");'],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- universal_newlines=1)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- (stdout, stderr) = p.communicate()
- if hasattr(file, 'newlines'):
- # Interpreter with universal newline support
- self.assertEqual(stdout,
- "line1\nline2\nline3\nline4\nline5\nline6")
- else:
- # Interpreter without universal newline support
- self.assertEqual(stdout,
- "line1\nline2\rline3\r\nline4\r\nline5\nline6")
-
- def test_no_leaking(self):
- if not test_support:
- raise SkipTest("No test_support module available.")
-
- # Make sure we leak no resources
- if not mswindows:
- max_handles = 1026 # too much for most UNIX systems
- else:
- max_handles = 2050 # too much for (at least some) Windows setups
- handles = []
- try:
- for i in range(max_handles):
- try:
- handles.append(os.open(test_support.TESTFN,
- os.O_WRONLY | os.O_CREAT))
- except OSError, e:
- if e.errno != errno.EMFILE:
- raise
- break
- else:
- # python-2.3 unittest doesn't have skipTest. Reimplement with nose
- #self.skipTest("failed to reach the file descriptor limit "
- # "(tried %d)" % max_handles)
- raise SkipTest("failed to reach the file descriptor limit "
- "(tried %d)" % max_handles)
-
- # Close a couple of them (should be enough for a subprocess)
- for i in range(10):
- os.close(handles.pop())
- # Loop creating some subprocesses. If one of them leaks some fds,
- # the next loop iteration will fail by reaching the max fd limit.
- for i in range(15):
- p = subprocess.Popen([sys.executable, "-c",
- "import sys;"
- "sys.stdout.write(sys.stdin.read())"],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- data = p.communicate("lime")[0]
- self.assertEqual(data, "lime")
- finally:
- for h in handles:
- os.close(h)
-
- def test_list2cmdline(self):
- self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
- '"a b c" d e')
- self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
- 'ab\\"c \\ d')
- self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
- 'ab\\"c " \\\\" d')
- self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
- 'a\\\\\\b "de fg" h')
- self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
- 'a\\\\\\"b c d')
- self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
- '"a\\\\b c" d e')
- self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
- '"a\\\\b\\ c" d e')
- self.assertEqual(subprocess.list2cmdline(['ab', '']),
- 'ab ""')
-
-
- def test_poll(self):
- p = subprocess.Popen([sys.executable,
- "-c", "import time; time.sleep(1)"])
- count = 0
- while p.poll() is None:
- time.sleep(0.1)
- count += 1
- # We expect that the poll loop probably went around about 10 times,
- # but, based on system scheduling we can't control, it's possible
- # poll() never returned None. It "should be" very rare that it
- # didn't go around at least twice.
- #self.assertGreaterEqual(count, 2)
- self.assert_(count >= 2)
- # Subsequent invocations should just return the returncode
- self.assertEqual(p.poll(), 0)
-
-
- def test_wait(self):
- p = subprocess.Popen([sys.executable,
- "-c", "import time; time.sleep(2)"])
- self.assertEqual(p.wait(), 0)
- # Subsequent invocations should just return the returncode
- self.assertEqual(p.wait(), 0)
-
-
- def test_invalid_bufsize(self):
- # an invalid type of the bufsize argument should raise
- # TypeError.
- #with self.assertRaises(TypeError):
- try:
- subprocess.Popen([sys.executable, "-c", "pass"], "orange")
- except TypeError:
- pass
- else:
- self.fail("Expected TypeError")
-
- def test_leaking_fds_on_error(self):
- # see bug #5179: Popen leaks file descriptors to PIPEs if
- # the child fails to execute; this will eventually exhaust
- # the maximum number of open fds. 1024 seems a very common
- # value for that limit, but Windows has 2048, so we loop
- # 1024 times (each call leaked two fds).
- for i in range(1024):
- # Windows raises IOError. Others raise OSError.
- #with self.assertRaises(EnvironmentError) as c:
- try:
- subprocess.Popen(['nonexisting_i_hope'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- #if c.exception.errno != 2: # ignore "no such file"
- # raise c.exception
- # Windows raises IOError
- except EnvironmentError, err:
- if err.errno not in (errno.ENOENT, errno.EACCES): # ignore "no such file"
- raise err
- except:
- self.fail("Expected EnvironmentError")
- else:
- self.fail("Expected EnvironmentError")
-
- def test_handles_closed_on_exception(self):
- # If CreateProcess exits with an error, ensure the
- # duplicate output handles are released
- ifhandle, ifname = mkstemp()
- ofhandle, ofname = mkstemp()
- efhandle, efname = mkstemp()
- try:
- subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
- stderr=efhandle)
- except OSError:
- os.close(ifhandle)
- os.remove(ifname)
- os.close(ofhandle)
- os.remove(ofname)
- os.close(efhandle)
- os.remove(efname)
- self.assert_(not os.path.exists(ifname))
- self.assert_(not os.path.exists(ofname))
- self.assert_(not os.path.exists(efname))
-
- def test_communicate_epipe(self):
- # Issue 10963: communicate() should hide EPIPE
- p = subprocess.Popen([sys.executable, "-c", 'pass'],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- self.addCleanup(p.stdin.close)
- p.communicate("x" * 2**20)
-
- def test_communicate_epipe_only_stdin(self):
- # Issue 10963: communicate() should hide EPIPE
- p = subprocess.Popen([sys.executable, "-c", 'pass'],
- stdin=subprocess.PIPE)
- self.addCleanup(p.stdin.close)
- time.sleep(2)
- p.communicate("x" * 2**20)
-
-# context manager
-class _SuppressCoreFiles(object):
- """Try to prevent core files from being created."""
- old_limit = None
-
- def __enter__(self):
- """Try to save previous ulimit, then set it to (0, 0)."""
- try:
- import resource
- self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
- resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
- except (ImportError, ValueError, resource.error):
- pass
-
- if sys.platform == 'darwin':
- # Check if the 'Crash Reporter' on OSX was configured
- # in 'Developer' mode and warn that it will get triggered
- # when it is.
- #
- # This assumes that this context manager is used in tests
- # that might trigger the next manager.
- value = subprocess.Popen(['/usr/bin/defaults', 'read',
- 'com.apple.CrashReporter', 'DialogType'],
- stdout=subprocess.PIPE).communicate()[0]
- if value.strip() == 'developer':
- print "this tests triggers the Crash Reporter, that is intentional"
- sys.stdout.flush()
-
- def __exit__(self, *args):
- """Return core file behavior to default."""
- if self.old_limit is None:
- return
- try:
- import resource
- resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
- except (ImportError, ValueError, resource.error):
- pass
-
- # python-2.3 unittest doesn't have skipUnless. Reimplement as SkipTest from nose
- #@unittest.skipUnless(hasattr(signal, 'SIGALRM'),
- # "Requires signal.SIGALRM")
- def test_communicate_eintr(self):
- if not hasattr(signal, 'SIGALRM'):
- raise SkipTest('Requires signal.SIGALRM')
- # Issue #12493: communicate() should handle EINTR
- def handler(signum, frame):
- pass
- old_handler = signal.signal(signal.SIGALRM, handler)
- self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
-
- # the process is running for 2 seconds
- args = [sys.executable, "-c", 'import time; time.sleep(2)']
- for stream in ('stdout', 'stderr'):
- kw = {stream: subprocess.PIPE}
- #with subprocess.Popen(args, **kw) as process:
- try:
- process = subprocess.Popen(args, **kw)
- signal.alarm(1)
- # communicate() will be interrupted by SIGALRM
- process.communicate()
- finally:
- process.close()
-
-
-# Not available with python-2.3's unittest. Reimplement with SkipTest from
-# nose
-#@unittest.skipIf(mswindows, "POSIX specific tests")
-class POSIXProcessTestCase(BaseTestCase):
- def setUp(self):
- if mswindows:
- raise SkipTest('POSIX specific tests')
-
- def test_exceptions(self):
- # caught & re-raised exceptions
- #with self.assertRaises(OSError) as c:
- try:
- p = subprocess.Popen([sys.executable, "-c", ""],
- cwd="/this/path/does/not/exist")
- # The attribute child_traceback should contain "os.chdir" somewhere.
- #self.assertIn("os.chdir", c.exception.child_traceback)
- except OSError, e:
- self.assertNotEqual(e.child_traceback.find("os.chdir"), -1)
- except:
- self.fail("Expected OSError")
- else:
- self.fail("Expected OSError")
-
- def _suppress_core_files(self):
- """Try to prevent core files from being created.
- Returns previous ulimit if successful, else None.
- """
- try:
- import resource
- old_limit = resource.getrlimit(resource.RLIMIT_CORE)
- resource.setrlimit(resource.RLIMIT_CORE, (0,0))
- return old_limit
- except (ImportError, ValueError, resource.error):
- return None
-
- def _unsuppress_core_files(self, old_limit):
- """Return core file behavior to default."""
- if old_limit is None:
- return
- try:
- import resource
- resource.setrlimit(resource.RLIMIT_CORE, old_limit)
- except (ImportError, ValueError, resource.error):
- return
-
- def test_run_abort(self):
- # returncode handles signal termination
- #with _SuppressCoreFiles():
- old_limit = self._suppress_core_files()
- try:
- p = subprocess.Popen([sys.executable, "-c",
- "import os; os.abort()"])
- finally:
- self._unsuppress_core_files(old_limit)
-
- #p.wait()
- p.wait()
- self.assertEqual(-p.returncode, signal.SIGABRT)
-
- def test_preexec(self):
- # preexec function
- p = subprocess.Popen([sys.executable, "-c",
- "import sys, os;"
- "sys.stdout.write(os.getenv('FRUIT'))"],
- stdout=subprocess.PIPE,
- preexec_fn=lambda: os.putenv("FRUIT", "apple"))
- self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.read(), "apple")
-
- def test_args_string(self):
- # args is a string
- f, fname = mkstemp()
- os.write(f, "#!/bin/sh\n")
- os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
- sys.executable)
- os.close(f)
- # 0o is not available in python2.5
- #os.chmod(fname, 0o700)
- os.chmod(fname, 0700)
- p = subprocess.Popen(fname)
- p.wait()
- os.remove(fname)
- self.assertEqual(p.returncode, 47)
-
- def test_invalid_args(self):
- # invalid arguments should raise ValueError
- self.assertRaises(ValueError, subprocess.call,
- [sys.executable, "-c",
- "import sys; sys.exit(47)"],
- startupinfo=47)
- self.assertRaises(ValueError, subprocess.call,
- [sys.executable, "-c",
- "import sys; sys.exit(47)"],
- creationflags=47)
-
- def test_shell_sequence(self):
- # Run command through the shell (sequence)
- newenv = os.environ.copy()
- newenv["FRUIT"] = "apple"
- p = subprocess.Popen(["echo $FRUIT"], shell=1,
- stdout=subprocess.PIPE,
- env=newenv)
- self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.read().strip(), "apple")
-
- def test_shell_string(self):
- # Run command through the shell (string)
- newenv = os.environ.copy()
- newenv["FRUIT"] = "apple"
- p = subprocess.Popen("echo $FRUIT", shell=1,
- stdout=subprocess.PIPE,
- env=newenv)
- self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.read().strip(), "apple")
-
- def test_call_string(self):
- # call() function with string argument on UNIX
- f, fname = mkstemp()
- os.write(f, "#!/bin/sh\n")
- os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
- sys.executable)
- os.close(f)
- os.chmod(fname, 0700)
- rc = subprocess.call(fname)
- os.remove(fname)
- self.assertEqual(rc, 47)
-
- def test_specific_shell(self):
- # Issue #9265: Incorrect name passed as arg[0].
- shells = []
- for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
- for name in ['bash', 'ksh']:
- sh = os.path.join(prefix, name)
- if os.path.isfile(sh):
- shells.append(sh)
- if not shells: # Will probably work for any shell but csh.
-
- # skipTest unavailable on python<2.7 reimplement with nose
- #self.skipTest("bash or ksh required for this test")
- raise SkipTest("bash or ksh required for this test")
- sh = '/bin/sh'
- if os.path.isfile(sh) and not os.path.islink(sh):
- # Test will fail if /bin/sh is a symlink to csh.
- shells.append(sh)
- for sh in shells:
- p = subprocess.Popen("echo $0", executable=sh, shell=True,
- stdout=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.read().strip(), sh)
-
- def _kill_process(self, method, *args):
- # Do not inherit file handles from the parent.
- # It should fix failures on some platforms.
- p = subprocess.Popen([sys.executable, "-c", """if 1:
- import sys, time
- sys.stdout.write('x\\n')
- sys.stdout.flush()
- time.sleep(30)
- """],
- close_fds=True,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- # Wait for the interpreter to be completely initialized before
- # sending any signal.
- p.stdout.read(1)
- getattr(p, method)(*args)
- return p
-
- # These are still hanging with x86_64 Fedora 12 (python-2.6) subprocess
- # backport from current trunk run under nosetests
-
- def test_send_signal(self):
- #if hang DISABLED #2777
- p = self._kill_process('send_signal', signal.SIGINT)
- _, stderr = p.communicate()
- self.assert_('KeyboardInterrupt' in stderr)
- self.assertNotEqual(p.wait(), 0)
-
- def test_kill(self):
- #if hang DISABLED #2777
- p = self._kill_process('kill')
- _, stderr = p.communicate()
- self.assertStderrEqual(stderr, '')
- self.assertEqual(p.wait(), -signal.SIGKILL)
-
- def test_terminate(self):
- #if hang DISABLED #2777
- p = self._kill_process('terminate')
- _, stderr = p.communicate()
- self.assertStderrEqual(stderr, '')
- self.assertEqual(p.wait(), -signal.SIGTERM)
-
- def check_close_std_fds(self, fds):
- # Issue #9905: test that subprocess pipes still work properly with
- # some standard fds closed
- stdin = 0
- newfds = []
- for a in fds:
- b = os.dup(a)
- newfds.append(b)
- if a == 0:
- stdin = b
- try:
- for fd in fds:
- os.close(fd)
- out, err = subprocess.Popen([sys.executable, "-c",
- 'import sys;'
- 'sys.stdout.write("apple");'
- 'sys.stdout.flush();'
- 'sys.stderr.write("orange")'],
- stdin=stdin,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE).communicate()
- err = re.sub(r"\[\d+ refs\]\r?\n?$", "", err).strip()
- self.assertEqual((out, err), ('apple', 'orange'))
- finally:
- for b, a in zip(newfds, fds):
- os.dup2(b, a)
- for b in newfds:
- os.close(b)
-
- def test_close_fd_0(self):
- self.check_close_std_fds([0])
-
- def test_close_fd_1(self):
- self.check_close_std_fds([1])
-
- def test_close_fd_2(self):
- self.check_close_std_fds([2])
-
- def test_close_fds_0_1(self):
- self.check_close_std_fds([0, 1])
-
- def test_close_fds_0_2(self):
- self.check_close_std_fds([0, 2])
-
- def test_close_fds_1_2(self):
- self.check_close_std_fds([1, 2])
-
- def test_close_fds_0_1_2(self):
- # Issue #10806: test that subprocess pipes still work properly with
- # all standard fds closed.
- self.check_close_std_fds([0, 1, 2])
-
- def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
- # open up some temporary files
- temps = [mkstemp() for i in range(3)]
- temp_fds = [fd for fd, fname in temps]
- try:
- # unlink the files -- we won't need to reopen them
- for fd, fname in temps:
- os.unlink(fname)
-
- # save a copy of the standard file descriptors
- saved_fds = [os.dup(fd) for fd in range(3)]
- try:
- # duplicate the temp files over the standard fd's 0, 1, 2
- for fd, temp_fd in enumerate(temp_fds):
- os.dup2(temp_fd, fd)
-
- # write some data to what will become stdin, and rewind
- os.write(stdin_no, "STDIN")
- os.lseek(stdin_no, 0, 0)
-
- # now use those files in the given order, so that subprocess
- # has to rearrange them in the child
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys; got = sys.stdin.read();'
- 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
- stdin=stdin_no,
- stdout=stdout_no,
- stderr=stderr_no)
- p.wait()
-
- for fd in temp_fds:
- os.lseek(fd, 0, 0)
-
- out = os.read(stdout_no, 1024)
- err = re.sub(r"\[\d+ refs\]\r?\n?$", "", os.read(stderr_no, 1024)).strip()
- finally:
- for std, saved in enumerate(saved_fds):
- os.dup2(saved, std)
- os.close(saved)
-
- self.assertEqual(out, "got STDIN")
- self.assertEqual(err, "err")
-
- finally:
- for fd in temp_fds:
- os.close(fd)
-
- # When duping fds, if there arises a situation where one of the fds is
- # either 0, 1 or 2, it is possible that it is overwritten (#12607).
- # This tests all combinations of this.
- def test_swap_fds(self):
- self.check_swap_fds(0, 1, 2)
- self.check_swap_fds(0, 2, 1)
- self.check_swap_fds(1, 0, 2)
- self.check_swap_fds(1, 2, 0)
- self.check_swap_fds(2, 0, 1)
- self.check_swap_fds(2, 1, 0)
-
- def test_wait_when_sigchild_ignored(self):
- # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
- if not test_support:
- raise SkipTest("No test_support module available.")
- sigchild_ignore = test_support.findfile(os.path.join("subprocessdata",
- "sigchild_ignore.py"))
- p = subprocess.Popen([sys.executable, sigchild_ignore],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
- self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
- " non-zero with this error:\n%s" % stderr)
-
-
- def test_zombie_fast_process_del(self):
- # Issue #12650: on Unix, if Popen.__del__() was called before the
- # process exited, it wouldn't be added to subprocess._active, and would
- # remain a zombie.
- # spawn a Popen, and delete its reference before it exits
- p = subprocess.Popen([sys.executable, "-c",
- 'import sys, time;'
- 'time.sleep(0.2)'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- ident = id(p)
- pid = p.pid
- del p
- # check that p is in the active processes list
- # assertIn not in python< 2.7
- #self.assertIn(ident, [id(o) for o in subprocess._active])
- self.assert_(ident in [id(o) for o in subprocess._active])
-
- def test_leak_fast_process_del_killed(self):
- # Issue #12650: on Unix, if Popen.__del__() was called before the
- # process exited, and the process got killed by a signal, it would never
- # be removed from subprocess._active, which triggered a FD and memory
- # leak.
- # spawn a Popen, delete its reference and kill it
- p = subprocess.Popen([sys.executable, "-c",
- 'import time;'
- 'time.sleep(3)'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- ident = id(p)
- pid = p.pid
- del p
- os.kill(pid, signal.SIGKILL)
- # check that p is in the active processes list
- # assertIn not in python < 2.7
- self.assert_(ident in [id(o) for o in subprocess._active])
- #self.assertIn(ident, [id(o) for o in subprocess._active])
-
- # let some time for the process to exit, and create a new Popen: this
- # should trigger the wait() of p
- time.sleep(0.2)
- #with self.assertRaises(EnvironmentError) as c:
- try:
- try:
- proc = subprocess.Popen(['nonexisting_i_hope'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- finally:
- pass
- except EnvironmentError:
- pass
- except:
- self.fail('Expected EnvironmentError')
- else:
- self.fail('Expected EnvironmentError')
- # p should have been wait()ed on, and removed from the _active list
- #self.assertRaises(OSError, os.waitpid, pid, 0)
- try:
- os.waitpid(pid, 0)
- except OSError:
- pass
- except:
- self.fail('Expected OSError')
- else:
- self.fail('Expected OSError')
- #self.assertNotIn(ident, [id(o) for o in subprocess._active])
- self.assert_(ident not in [id(o) for o in subprocess._active])
-
- def test_pipe_cloexec(self):
- # Issue 12786: check that the communication pipes' FDs are set CLOEXEC,
- # and are not inherited by another child process.
- p1 = subprocess.Popen([sys.executable, "-c",
- 'import os;'
- 'os.read(0, 1)'
- ],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
-
- p2 = subprocess.Popen([sys.executable, "-c", """if True:
- import os, errno, sys
- for fd in %r:
- try:
- os.close(fd)
- except OSError, e:
- if e.errno != errno.EBADF:
- raise
- else:
- sys.exit(1)
- sys.exit(0)
- """ % [f.fileno() for f in (p1.stdin, p1.stdout,
- p1.stderr)]
- ],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE, close_fds=False)
- p1.communicate('foo')
- _, stderr = p2.communicate()
-
- self.assertEqual(p2.returncode, 0, "Unexpected error: " + repr(stderr))
-
-
-# Not available with python-2.3's unittest, reimplement with SkipTest from
-# nose
-##@unittest.skipUnless(mswindows, "Windows specific tests")
-class Win32ProcessTestCase(BaseTestCase):
- def setUp(self):
- if not mswindows:
- raise SkipTest('Windows specific tests')
-
- def test_startupinfo(self):
- # startupinfo argument
- # We uses hardcoded constants, because we do not want to
- # depend on win32all.
- STARTF_USESHOWWINDOW = 1
- SW_MAXIMIZE = 3
- startupinfo = subprocess.STARTUPINFO()
- startupinfo.dwFlags = STARTF_USESHOWWINDOW
- startupinfo.wShowWindow = SW_MAXIMIZE
- # Since Python is a console process, it won't be affected
- # by wShowWindow, but the argument should be silently
- # ignored
- subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
- startupinfo=startupinfo)
-
- def test_creationflags(self):
- # creationflags argument
- CREATE_NEW_CONSOLE = 16
- sys.stderr.write(" a DOS box should flash briefly ...\n")
- subprocess.call(sys.executable +
- ' -c "import time; time.sleep(0.25)"',
- creationflags=CREATE_NEW_CONSOLE)
-
- def test_invalid_args(self):
- # invalid arguments should raise ValueError
- self.assertRaises(ValueError, subprocess.call,
- [sys.executable, "-c",
- "import sys; sys.exit(47)"],
- preexec_fn=lambda: 1)
- self.assertRaises(ValueError, subprocess.call,
- [sys.executable, "-c",
- "import sys; sys.exit(47)"],
- stdout=subprocess.PIPE,
- close_fds=True)
-
- def test_close_fds(self):
- # close file descriptors
- rc = subprocess.call([sys.executable, "-c",
- "import sys; sys.exit(47)"],
- close_fds=True)
- self.assertEqual(rc, 47)
-
- def test_shell_sequence(self):
- # Run command through the shell (sequence)
- newenv = os.environ.copy()
- newenv["FRUIT"] = "physalis"
- p = subprocess.Popen(["set"], shell=1,
- stdout=subprocess.PIPE,
- env=newenv)
- self.addCleanup(p.stdout.close)
- #self.assertIn("physalis", p.stdout.read())
- self.assert_("physalis" in p.stdout.read())
-
- def test_shell_string(self):
- # Run command through the shell (string)
- newenv = os.environ.copy()
- newenv["FRUIT"] = "physalis"
- p = subprocess.Popen("set", shell=1,
- stdout=subprocess.PIPE,
- env=newenv)
- self.addCleanup(p.stdout.close)
- #self.assertIn("physalis", p.stdout.read())
- self.assert_("physalis" in p.stdout.read())
-
- def test_call_string(self):
- # call() function with string argument on Windows
- rc = subprocess.call(sys.executable +
- ' -c "import sys; sys.exit(47)"')
- self.assertEqual(rc, 47)
-
- def _kill_process(self, method, *args):
- # Some win32 buildbot raises EOFError if stdin is inherited
- p = subprocess.Popen([sys.executable, "-c", """if 1:
- import sys, time
- sys.stdout.write('x\\n')
- sys.stdout.flush()
- time.sleep(30)
- """],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- self.addCleanup(p.stdin.close)
- # Wait for the interpreter to be completely initialized before
- # sending any signal.
- p.stdout.read(1)
- getattr(p, method)(*args)
- _, stderr = p.communicate()
- self.assertStderrEqual(stderr, '')
- returncode = p.wait()
- self.assertNotEqual(returncode, 0)
-
- def test_send_signal(self):
- #if hang DISABLED #2777
- self._kill_process('send_signal', signal.SIGTERM)
-
- def test_kill(self):
- #if hang DISABLED #2777
- self._kill_process('kill')
-
- def test_terminate(self):
- #if hang DISABLED #2777
- self._kill_process('terminate')
-
-
-# Not available with python-2.3's unittest. reimplement with SkipTest from
-# nose
-#@unittest.skipUnless(getattr(subprocess, '_has_poll', False),
-# "poll system call not supported")
-class ProcessTestCaseNoPoll(ProcessTestCase):
- def setUp(self):
- if not getattr(subprocess, '_has_poll', False):
- raise SkipTest('poll system call not supported')
- subprocess._has_poll = False
- ProcessTestCase.setUp(self)
-
- def tearDown(self):
- subprocess._has_poll = True
- ProcessTestCase.tearDown(self)
-
-
-class HelperFunctionTests(unittest.TestCase):
- # Not available with python-2.3's unittest. reimplement with SkipTest from
- # nose
- #@unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")
- def test_eintr_retry_call(self):
- if mswindows:
- raise SkipTest('errno and EINTR make no sense on windows')
- record_calls = []
- def fake_os_func(*args):
- record_calls.append(args)
- if len(record_calls) == 2:
- raise OSError(errno.EINTR, "fake interrupted system call")
- # reversed() is not available in python-2.3
- args = list(args)
- args.reverse()
- return tuple(args)
-
- self.assertEqual((999, 256),
- subprocess._eintr_retry_call(fake_os_func, 256, 999))
- self.assertEqual([(256, 999)], record_calls)
- # This time there will be an EINTR so it will loop once.
- self.assertEqual((666,),
- subprocess._eintr_retry_call(fake_os_func, 666))
- self.assertEqual([(256, 999), (666,), (666,)], record_calls)
-
-
-# SkipUnless is not available on python < 2.7, reimplement with nose
-#@unittest.skipUnless(mswindows, "mswindows only")
-class CommandsWithSpaces (BaseTestCase):
-
- def setUp(self):
- if not mswindows:
- raise SkipTest('mswindows only')
-
- super(CommandsWithSpaces, self).setUp()
- f, fname = mkstemp(".py", "te st")
- self.fname = fname.lower ()
- os.write(f, "import sys;"
- "sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
- )
- os.close(f)
-
- def tearDown(self):
- os.remove(self.fname)
- super(CommandsWithSpaces, self).tearDown()
-
- def with_spaces(self, *args, **kwargs):
- kwargs['stdout'] = subprocess.PIPE
- p = subprocess.Popen(*args, **kwargs)
- self.addCleanup(p.stdout.close)
- self.assertEqual(
- p.stdout.read ().decode("mbcs"),
- "2 [%r, 'ab cd']" % self.fname
- )
-
- def test_shell_string_with_spaces(self):
- # call() function with string argument with spaces on Windows
- self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
- "ab cd"), shell=1)
-
- def test_shell_sequence_with_spaces(self):
- # call() function with sequence argument with spaces on Windows
- self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
-
- def test_noshell_string_with_spaces(self):
- # call() function with string argument with spaces on Windows
- self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
- "ab cd"))
-
- def test_noshell_sequence_with_spaces(self):
- # call() function with sequence argument with spaces on Windows
- self.with_spaces([sys.executable, self.fname, "ab cd"])
-
-
-# We're using nosetests so we don't need this. This just leads to tests being
-# run twice
-#def test_main():
-# unit_tests = (ProcessTestCase,
-# POSIXProcessTestCase,
-# Win32ProcessTestCase,
-# ProcessTestCaseNoPoll,
-# HelperFunctionTests
-# )
-#
-# test_support.run_unittest(*unit_tests)
-# test_support.reap_children()
-#
-#if __name__ == "__main__":
-# test_main()
diff --git a/kitchen3/docs/api-overview.rst b/kitchen3/docs/api-overview.rst
index dda56fe..e53a94d 100644
--- a/kitchen3/docs/api-overview.rst
+++ b/kitchen3/docs/api-overview.rst
@@ -16,9 +16,6 @@ that may drag in more dependencies can be found on the `project webpage`_
api-collections
api-iterutils
api-versioning
- api-pycompat24
- api-pycompat25
- api-pycompat27
api-exceptions
.. _`project webpage`: https://fedorahosted.org/kitchen
diff --git a/kitchen3/docs/api-pycompat24.rst b/kitchen3/docs/api-pycompat24.rst
deleted file mode 100644
index a3247b6..0000000
--- a/kitchen3/docs/api-pycompat24.rst
+++ /dev/null
@@ -1,34 +0,0 @@
-=======================
-Python 2.4 Compatibiity
-=======================
-
-
--------------------
-Sets for python-2.3
--------------------
-
-.. automodule:: kitchen.pycompat24.sets
-.. autofunction:: kitchen.pycompat24.sets.add_builtin_set
-
-----------------------------------
-Partial new style base64 interface
-----------------------------------
-
-.. automodule:: kitchen.pycompat24.base64
- :members:
-
-----------
-Subprocess
-----------
-
-.. seealso::
-
- :mod:`kitchen.pycompat27.subprocess`
- Kitchen includes the python-2.7 version of subprocess which has a new
- function, :func:`~kitchen.pycompat27.subprocess.check_output`. When
- you import :mod:`pycompat24.subprocess` you will be getting the
- python-2.7 version of subprocess rather than the 2.4 version (where
- subprocess first appeared). This choice was made so that we can
- concentrate our efforts on keeping the single version of subprocess up
- to date rather than working on a 2.4 version that very few people
- would need specifically.
diff --git a/kitchen3/docs/api-pycompat25.rst b/kitchen3/docs/api-pycompat25.rst
deleted file mode 100644
index 2323c2f..0000000
--- a/kitchen3/docs/api-pycompat25.rst
+++ /dev/null
@@ -1,8 +0,0 @@
-========================
-Python 2.5 Compatibility
-========================
-
-.. automodule:: kitchen.pycompat25
-
-.. automodule:: kitchen.pycompat25.collections.defaultdict
-
diff --git a/kitchen3/docs/api-pycompat27.rst b/kitchen3/docs/api-pycompat27.rst
deleted file mode 100644
index 6ef6db1..0000000
--- a/kitchen3/docs/api-pycompat27.rst
+++ /dev/null
@@ -1,35 +0,0 @@
-========================
-Python 2.7 Compatibility
-========================
-
-.. module:: kitchen.pycompat27.subprocess
-
---------------------------
-Subprocess from Python 2.7
---------------------------
-
-The :mod:`subprocess` module included here is a direct import from
-python-2.7's |stdlib|_. You can access it via::
-
- >>> from kitchen.pycompat27 import subprocess
-
-The motivation for including this module is that various API changing
-improvements have been made to subprocess over time. The following is a list
-of the known changes to :mod:`subprocess` with the python version they were
-introduced in:
-
-==================================== ===
-New API Feature Ver
-==================================== ===
-:exc:`subprocess.CalledProcessError` 2.5
-:func:`subprocess.check_call` 2.5
-:func:`subprocess.check_output` 2.7
-:meth:`subprocess.Popen.send_signal` 2.6
-:meth:`subprocess.Popen.terminate` 2.6
-:meth:`subprocess.Popen.kill` 2.6
-==================================== ===
-
-.. seealso::
-
- The stdlib :mod:`subprocess` documenation
- For complete documentation on how to use subprocess
diff --git a/kitchen3/tests/subprocessdata/sigchild_ignore.py b/kitchen3/tests/subprocessdata/sigchild_ignore.py
deleted file mode 100644
index 5b6dd08..0000000
--- a/kitchen3/tests/subprocessdata/sigchild_ignore.py
+++ /dev/null
@@ -1,11 +0,0 @@
-import os
-import signal, sys
-sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
-
-from kitchen.pycompat27.subprocess import _subprocess as subprocess
-
-# On Linux this causes os.waitpid to fail with OSError as the OS has already
-# reaped our child process. The wait() passing the OSError on to the caller
-# and causing us to exit with an error is what we are testing against.
-signal.signal(signal.SIGCHLD, signal.SIG_IGN)
-subprocess.Popen([sys.executable, '-c', 'print("albatross")']).wait()
diff --git a/kitchen3/tests/test__all__.py b/kitchen3/tests/test__all__.py
index d25cb3f..7ceef8b 100644
--- a/kitchen3/tests/test__all__.py
+++ b/kitchen3/tests/test__all__.py
@@ -4,8 +4,6 @@ from nose import tools
import os
import types
import warnings
-from kitchen.pycompat24.sets import add_builtin_set
-add_builtin_set()
def logit(msg):
log = open('/var/tmp/test.log', 'a')
diff --git a/kitchen3/tests/test_collections.py b/kitchen3/tests/test_collections.py
index e3da84b..12a6d04 100644
--- a/kitchen3/tests/test_collections.py
+++ b/kitchen3/tests/test_collections.py
@@ -3,9 +3,6 @@
import unittest
from nose import tools
-from kitchen.pycompat24.sets import add_builtin_set
-add_builtin_set()
-
from kitchen import collections
def test_strict_dict_get_set():
diff --git a/kitchen3/tests/test_deprecation_py3.py b/kitchen3/tests/test_deprecation_py3.py
deleted file mode 100644
index f03c0df..0000000
--- a/kitchen3/tests/test_deprecation_py3.py
+++ /dev/null
@@ -1,65 +0,0 @@
-# -*- coding: utf-8 -*-
-
-from nose import tools
-
-import sys
-import warnings
-
-import importlib
-from kitchen.pycompat25.collections import defaultdict
-
-class TestPendingDeprecationModules(object):
- def __init__(self):
- kitchen_path = 'kitchen'
- collections_path = 'kitchen/collections'
- pycompat24_path = 'kitchen/pycompat24'
- pycompat25_path = 'kitchen/pycompat25'
- pycompat27_path = 'kitchen/pycompat27'
-
- self.module_data = (
- ('strictdict', 'kitchen.collections.strictdict', collections_path),
- ('pycompat24', 'kitchen.pycompat24', kitchen_path),
- ('base64', 'kitchen.pycompat24.base64', pycompat24_path),
- ('sets', 'kitchen.pycompat24.sets', pycompat24_path),
- ('subprocess', 'kitchen.pycompat24.subprocess', pycompat24_path),
- ('pycompat25', 'kitchen.pycompat25', kitchen_path),
- ('collections', 'kitchen.pycompat25.collections', pycompat25_path),
- ('pycompat27', 'kitchen.pycompat27', kitchen_path),
- ('subprocess', 'kitchen.pycompat27.subprocess', pycompat27_path),
- )
-
- def setUp(self):
- for module in sys.modules.values():
- if hasattr(module, '__warningregistry__'):
- del module.__warningregistry__
-
- def check_modules(self, module_name, module_fqn, module_path):
- with warnings.catch_warnings(record=True) as w:
- warnings.simplefilter('always')
- # imp.load_module will load even if it has already been loaded.
- # We need to ensure that happens in order to trigger the
- # deprecation warnings
- importlib.find_loader(module_fqn, module_path).load_module()
- warning_raised = False
- for warning in (e.message for e in w):
- if isinstance(warning, PendingDeprecationWarning) and \
- ('%s is deprecated' % module_name) in warning.args[0]:
- warning_raised = True
- break
- tools.assert_true(warning_raised, msg='%s did not raise a PendingDeprecationWarning' % module_fqn)
-
- def test_modules(self):
- for mod in self.module_data:
- yield self.check_modules, mod[0], mod[1], mod[2]
-
- def test_defaultdict(self):
- with warnings.catch_warnings(record=True) as w:
- warnings.simplefilter('always')
- defaultdict()
- warning_raised = False
- for warning in (e.message for e in w):
- if isinstance(warning, PendingDeprecationWarning) and \
- ('defaultdict is deprecated') in warning.args[0]:
- warning_raised = True
- break
- tools.assert_true(warning_raised, msg='kitchen.pycompat25.collections.defaultdict did not raise a PendingDeprecationWarning')
diff --git a/kitchen3/tests/test_pycompat.py b/kitchen3/tests/test_pycompat.py
deleted file mode 100644
index 50a059b..0000000
--- a/kitchen3/tests/test_pycompat.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-import unittest
-from nose import tools
-
-class TestUsableModules(unittest.TestCase):
- def test_subprocess(self):
- '''Test that importing subprocess as a module works
- '''
- try:
- from kitchen.pycompat24.subprocess import Popen
- except ImportError:
- tools.ok_(False, 'Unable to import pycompat24.subprocess as a module')
- try:
- from kitchen.pycompat27.subprocess import Popen
- except ImportError:
- tools.ok_(False, 'Unable to import pycompat27.subprocess as a module')
-
- def test_base64(self):
- '''Test that importing base64 as a module works
- '''
- try:
- from kitchen.pycompat24.base64 import b64encode
- except ImportError:
- tools.ok_(False, 'Unable to import pycompat24.base64 as a module')
diff --git a/setup.py b/setup.py
index 4813cae..e1a5c48 100755
--- a/setup.py
+++ b/setup.py
@@ -15,13 +15,6 @@ if sys.version_info[0] == 2:
'kitchen.iterutils',
'kitchen.collections',
'kitchen.text',
- 'kitchen.pycompat24',
- 'kitchen.pycompat24.base64',
- 'kitchen.pycompat24.sets',
- 'kitchen.pycompat25',
- 'kitchen.pycompat25.collections',
- 'kitchen.pycompat27',
- 'kitchen.pycompat27.subprocess',
]
elif sys.version_info[0] == 3:
source_dir = 'kitchen3'
@@ -32,13 +25,6 @@ elif sys.version_info[0] == 3:
'kitchen.iterutils',
'kitchen.collections',
'kitchen.text',
- 'kitchen.pycompat24',
- 'kitchen.pycompat24.base64',
- 'kitchen.pycompat24.sets',
- 'kitchen.pycompat25',
- 'kitchen.pycompat25.collections',
- 'kitchen.pycompat27',
- 'kitchen.pycompat27.subprocess',
]
else:
raise NotImplementedError("Python version unsupported %r" % sys.version)