forked from mirrors/gecko-dev
		
	
		
			
				
	
	
		
			464 lines
		
	
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			464 lines
		
	
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python
 | 
						|
 | 
						|
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
 | 
						|
# Use of this source code is governed by a BSD-style license that can be
 | 
						|
# found in the LICENSE file.
 | 
						|
 | 
						|
"""Windows specific tests.  These are implicitly run by test_psutil.py."""
 | 
						|
 | 
						|
import errno
 | 
						|
import os
 | 
						|
import platform
 | 
						|
import signal
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import traceback
 | 
						|
 | 
						|
from test_psutil import APPVEYOR, WINDOWS
 | 
						|
from test_psutil import get_test_subprocess, reap_children, unittest
 | 
						|
 | 
						|
import mock
 | 
						|
try:
 | 
						|
    import wmi
 | 
						|
except ImportError:
 | 
						|
    wmi = None
 | 
						|
try:
 | 
						|
    import win32api
 | 
						|
    import win32con
 | 
						|
except ImportError:
 | 
						|
    win32api = win32con = None
 | 
						|
 | 
						|
from psutil._compat import PY3, callable, long
 | 
						|
import psutil
 | 
						|
 | 
						|
 | 
						|
cext = psutil._psplatform.cext
 | 
						|
 | 
						|
 | 
						|
def wrap_exceptions(fun):
 | 
						|
    def wrapper(self, *args, **kwargs):
 | 
						|
        try:
 | 
						|
            return fun(self, *args, **kwargs)
 | 
						|
        except OSError as err:
 | 
						|
            from psutil._pswindows import ACCESS_DENIED_SET
 | 
						|
            if err.errno in ACCESS_DENIED_SET:
 | 
						|
                raise psutil.AccessDenied(None, None)
 | 
						|
            if err.errno == errno.ESRCH:
 | 
						|
                raise psutil.NoSuchProcess(None, None)
 | 
						|
            raise
 | 
						|
    return wrapper
 | 
						|
 | 
						|
 | 
						|
@unittest.skipUnless(WINDOWS, "not a Windows system")
 | 
						|
class WindowsSpecificTestCase(unittest.TestCase):
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def setUpClass(cls):
 | 
						|
        cls.pid = get_test_subprocess().pid
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def tearDownClass(cls):
 | 
						|
        reap_children()
 | 
						|
 | 
						|
    def test_issue_24(self):
 | 
						|
        p = psutil.Process(0)
 | 
						|
        self.assertRaises(psutil.AccessDenied, p.kill)
 | 
						|
 | 
						|
    def test_special_pid(self):
 | 
						|
        p = psutil.Process(4)
 | 
						|
        self.assertEqual(p.name(), 'System')
 | 
						|
        # use __str__ to access all common Process properties to check
 | 
						|
        # that nothing strange happens
 | 
						|
        str(p)
 | 
						|
        p.username()
 | 
						|
        self.assertTrue(p.create_time() >= 0.0)
 | 
						|
        try:
 | 
						|
            rss, vms = p.memory_info()
 | 
						|
        except psutil.AccessDenied:
 | 
						|
            # expected on Windows Vista and Windows 7
 | 
						|
            if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
 | 
						|
                raise
 | 
						|
        else:
 | 
						|
            self.assertTrue(rss > 0)
 | 
						|
 | 
						|
    def test_send_signal(self):
 | 
						|
        p = psutil.Process(self.pid)
 | 
						|
        self.assertRaises(ValueError, p.send_signal, signal.SIGINT)
 | 
						|
 | 
						|
    def test_nic_names(self):
 | 
						|
        p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE)
 | 
						|
        out = p.communicate()[0]
 | 
						|
        if PY3:
 | 
						|
            out = str(out, sys.stdout.encoding)
 | 
						|
        nics = psutil.net_io_counters(pernic=True).keys()
 | 
						|
        for nic in nics:
 | 
						|
            if "pseudo-interface" in nic.replace(' ', '-').lower():
 | 
						|
                continue
 | 
						|
            if nic not in out:
 | 
						|
                self.fail(
 | 
						|
                    "%r nic wasn't found in 'ipconfig /all' output" % nic)
 | 
						|
 | 
						|
    def test_exe(self):
 | 
						|
        for p in psutil.process_iter():
 | 
						|
            try:
 | 
						|
                self.assertEqual(os.path.basename(p.exe()), p.name())
 | 
						|
            except psutil.Error:
 | 
						|
                pass
 | 
						|
 | 
						|
    # --- Process class tests
 | 
						|
 | 
						|
    @unittest.skipIf(wmi is None, "wmi module is not installed")
 | 
						|
    def test_process_name(self):
 | 
						|
        w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
 | 
						|
        p = psutil.Process(self.pid)
 | 
						|
        self.assertEqual(p.name(), w.Caption)
 | 
						|
 | 
						|
    @unittest.skipIf(wmi is None, "wmi module is not installed")
 | 
						|
    def test_process_exe(self):
 | 
						|
        w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
 | 
						|
        p = psutil.Process(self.pid)
 | 
						|
        # Note: wmi reports the exe as a lower case string.
 | 
						|
        # Being Windows paths case-insensitive we ignore that.
 | 
						|
        self.assertEqual(p.exe().lower(), w.ExecutablePath.lower())
 | 
						|
 | 
						|
    @unittest.skipIf(wmi is None, "wmi module is not installed")
 | 
						|
    def test_process_cmdline(self):
 | 
						|
        w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
 | 
						|
        p = psutil.Process(self.pid)
 | 
						|
        self.assertEqual(' '.join(p.cmdline()),
 | 
						|
                         w.CommandLine.replace('"', ''))
 | 
						|
 | 
						|
    @unittest.skipIf(wmi is None, "wmi module is not installed")
 | 
						|
    def test_process_username(self):
 | 
						|
        w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
 | 
						|
        p = psutil.Process(self.pid)
 | 
						|
        domain, _, username = w.GetOwner()
 | 
						|
        username = "%s\\%s" % (domain, username)
 | 
						|
        self.assertEqual(p.username(), username)
 | 
						|
 | 
						|
    @unittest.skipIf(wmi is None, "wmi module is not installed")
 | 
						|
    def test_process_rss_memory(self):
 | 
						|
        time.sleep(0.1)
 | 
						|
        w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
 | 
						|
        p = psutil.Process(self.pid)
 | 
						|
        rss = p.memory_info().rss
 | 
						|
        self.assertEqual(rss, int(w.WorkingSetSize))
 | 
						|
 | 
						|
    @unittest.skipIf(wmi is None, "wmi module is not installed")
 | 
						|
    def test_process_vms_memory(self):
 | 
						|
        time.sleep(0.1)
 | 
						|
        w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
 | 
						|
        p = psutil.Process(self.pid)
 | 
						|
        vms = p.memory_info().vms
 | 
						|
        # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
 | 
						|
        # ...claims that PageFileUsage is represented in Kilo
 | 
						|
        # bytes but funnily enough on certain platforms bytes are
 | 
						|
        # returned instead.
 | 
						|
        wmi_usage = int(w.PageFileUsage)
 | 
						|
        if (vms != wmi_usage) and (vms != wmi_usage * 1024):
 | 
						|
            self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
 | 
						|
 | 
						|
    @unittest.skipIf(wmi is None, "wmi module is not installed")
 | 
						|
    def test_process_create_time(self):
 | 
						|
        w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
 | 
						|
        p = psutil.Process(self.pid)
 | 
						|
        wmic_create = str(w.CreationDate.split('.')[0])
 | 
						|
        psutil_create = time.strftime("%Y%m%d%H%M%S",
 | 
						|
                                      time.localtime(p.create_time()))
 | 
						|
        self.assertEqual(wmic_create, psutil_create)
 | 
						|
 | 
						|
    # --- psutil namespace functions and constants tests
 | 
						|
 | 
						|
    @unittest.skipUnless('NUMBER_OF_PROCESSORS' in os.environ,
 | 
						|
                         'NUMBER_OF_PROCESSORS env var is not available')
 | 
						|
    def test_cpu_count(self):
 | 
						|
        num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
 | 
						|
        self.assertEqual(num_cpus, psutil.cpu_count())
 | 
						|
 | 
						|
    @unittest.skipIf(wmi is None, "wmi module is not installed")
 | 
						|
    def test_total_phymem(self):
 | 
						|
        w = wmi.WMI().Win32_ComputerSystem()[0]
 | 
						|
        self.assertEqual(int(w.TotalPhysicalMemory),
 | 
						|
                         psutil.virtual_memory().total)
 | 
						|
 | 
						|
    # @unittest.skipIf(wmi is None, "wmi module is not installed")
 | 
						|
    # def test__UPTIME(self):
 | 
						|
    #     # _UPTIME constant is not public but it is used internally
 | 
						|
    #     # as value to return for pid 0 creation time.
 | 
						|
    #     # WMI behaves the same.
 | 
						|
    #     w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
 | 
						|
    #     p = psutil.Process(0)
 | 
						|
    #     wmic_create = str(w.CreationDate.split('.')[0])
 | 
						|
    #     psutil_create = time.strftime("%Y%m%d%H%M%S",
 | 
						|
    #                                   time.localtime(p.create_time()))
 | 
						|
    #
 | 
						|
 | 
						|
    # Note: this test is not very reliable
 | 
						|
    @unittest.skipIf(wmi is None, "wmi module is not installed")
 | 
						|
    @unittest.skipIf(APPVEYOR, "test not relieable on appveyor")
 | 
						|
    def test_pids(self):
 | 
						|
        # Note: this test might fail if the OS is starting/killing
 | 
						|
        # other processes in the meantime
 | 
						|
        w = wmi.WMI().Win32_Process()
 | 
						|
        wmi_pids = set([x.ProcessId for x in w])
 | 
						|
        psutil_pids = set(psutil.pids())
 | 
						|
        self.assertEqual(wmi_pids, psutil_pids)
 | 
						|
 | 
						|
    @unittest.skipIf(wmi is None, "wmi module is not installed")
 | 
						|
    def test_disks(self):
 | 
						|
        ps_parts = psutil.disk_partitions(all=True)
 | 
						|
        wmi_parts = wmi.WMI().Win32_LogicalDisk()
 | 
						|
        for ps_part in ps_parts:
 | 
						|
            for wmi_part in wmi_parts:
 | 
						|
                if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
 | 
						|
                    if not ps_part.mountpoint:
 | 
						|
                        # this is usually a CD-ROM with no disk inserted
 | 
						|
                        break
 | 
						|
                    try:
 | 
						|
                        usage = psutil.disk_usage(ps_part.mountpoint)
 | 
						|
                    except OSError as err:
 | 
						|
                        if err.errno == errno.ENOENT:
 | 
						|
                            # usually this is the floppy
 | 
						|
                            break
 | 
						|
                        else:
 | 
						|
                            raise
 | 
						|
                    self.assertEqual(usage.total, int(wmi_part.Size))
 | 
						|
                    wmi_free = int(wmi_part.FreeSpace)
 | 
						|
                    self.assertEqual(usage.free, wmi_free)
 | 
						|
                    # 10 MB tollerance
 | 
						|
                    if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
 | 
						|
                        self.fail("psutil=%s, wmi=%s" % (
 | 
						|
                            usage.free, wmi_free))
 | 
						|
                    break
 | 
						|
            else:
 | 
						|
                self.fail("can't find partition %s" % repr(ps_part))
 | 
						|
 | 
						|
    @unittest.skipIf(win32api is None, "pywin32 module is not installed")
 | 
						|
    def test_num_handles(self):
 | 
						|
        p = psutil.Process(os.getpid())
 | 
						|
        before = p.num_handles()
 | 
						|
        handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
 | 
						|
                                      win32con.FALSE, os.getpid())
 | 
						|
        after = p.num_handles()
 | 
						|
        self.assertEqual(after, before + 1)
 | 
						|
        win32api.CloseHandle(handle)
 | 
						|
        self.assertEqual(p.num_handles(), before)
 | 
						|
 | 
						|
    @unittest.skipIf(win32api is None, "pywin32 module is not installed")
 | 
						|
    def test_num_handles_2(self):
 | 
						|
        # Note: this fails from time to time; I'm keen on thinking
 | 
						|
        # it doesn't mean something is broken
 | 
						|
        def call(p, attr):
 | 
						|
            attr = getattr(p, name, None)
 | 
						|
            if attr is not None and callable(attr):
 | 
						|
                attr()
 | 
						|
            else:
 | 
						|
                attr
 | 
						|
 | 
						|
        p = psutil.Process(self.pid)
 | 
						|
        failures = []
 | 
						|
        for name in dir(psutil.Process):
 | 
						|
            if name.startswith('_') \
 | 
						|
                or name in ('terminate', 'kill', 'suspend', 'resume',
 | 
						|
                            'nice', 'send_signal', 'wait', 'children',
 | 
						|
                            'as_dict'):
 | 
						|
                continue
 | 
						|
            else:
 | 
						|
                try:
 | 
						|
                    call(p, name)
 | 
						|
                    num1 = p.num_handles()
 | 
						|
                    call(p, name)
 | 
						|
                    num2 = p.num_handles()
 | 
						|
                except (psutil.NoSuchProcess, psutil.AccessDenied):
 | 
						|
                    pass
 | 
						|
                else:
 | 
						|
                    if num2 > num1:
 | 
						|
                        fail = \
 | 
						|
                            "failure while processing Process.%s method " \
 | 
						|
                            "(before=%s, after=%s)" % (name, num1, num2)
 | 
						|
                        failures.append(fail)
 | 
						|
        if failures:
 | 
						|
            self.fail('\n' + '\n'.join(failures))
 | 
						|
 | 
						|
    def test_name_always_available(self):
 | 
						|
        # On Windows name() is never supposed to raise AccessDenied,
 | 
						|
        # see https://github.com/giampaolo/psutil/issues/627
 | 
						|
        for p in psutil.process_iter():
 | 
						|
            try:
 | 
						|
                p.name()
 | 
						|
            except psutil.NoSuchProcess():
 | 
						|
                pass
 | 
						|
 | 
						|
 | 
						|
@unittest.skipUnless(WINDOWS, "not a Windows system")
 | 
						|
class TestDualProcessImplementation(unittest.TestCase):
 | 
						|
    """
 | 
						|
    Certain APIs on Windows have 2 internal implementations, one
 | 
						|
    based on documented Windows APIs, another one based
 | 
						|
    NtQuerySystemInformation() which gets called as fallback in
 | 
						|
    case the first fails because of limited permission error.
 | 
						|
    Here we test that the two methods return the exact same value,
 | 
						|
    see:
 | 
						|
    https://github.com/giampaolo/psutil/issues/304
 | 
						|
    """
 | 
						|
 | 
						|
    fun_names = [
 | 
						|
        # function name, tolerance
 | 
						|
        ('proc_cpu_times', 0.2),
 | 
						|
        ('proc_create_time', 0.5),
 | 
						|
        ('proc_num_handles', 1),  # 1 because impl #1 opens a handle
 | 
						|
        ('proc_memory_info', 1024),  # KB
 | 
						|
        ('proc_io_counters', 0),
 | 
						|
    ]
 | 
						|
 | 
						|
    def test_compare_values(self):
 | 
						|
        def assert_ge_0(obj):
 | 
						|
            if isinstance(obj, tuple):
 | 
						|
                for value in obj:
 | 
						|
                    self.assertGreaterEqual(value, 0, msg=obj)
 | 
						|
            elif isinstance(obj, (int, long, float)):
 | 
						|
                self.assertGreaterEqual(obj, 0)
 | 
						|
            else:
 | 
						|
                assert 0  # case not handled which needs to be fixed
 | 
						|
 | 
						|
        def compare_with_tolerance(ret1, ret2, tolerance):
 | 
						|
            if ret1 == ret2:
 | 
						|
                return
 | 
						|
            else:
 | 
						|
                if isinstance(ret2, (int, long, float)):
 | 
						|
                    diff = abs(ret1 - ret2)
 | 
						|
                    self.assertLessEqual(diff, tolerance)
 | 
						|
                elif isinstance(ret2, tuple):
 | 
						|
                    for a, b in zip(ret1, ret2):
 | 
						|
                        diff = abs(a - b)
 | 
						|
                        self.assertLessEqual(diff, tolerance)
 | 
						|
 | 
						|
        from psutil._pswindows import ntpinfo
 | 
						|
        failures = []
 | 
						|
        for p in psutil.process_iter():
 | 
						|
            try:
 | 
						|
                nt = ntpinfo(*cext.proc_info(p.pid))
 | 
						|
            except psutil.NoSuchProcess:
 | 
						|
                continue
 | 
						|
            assert_ge_0(nt)
 | 
						|
 | 
						|
            for name, tolerance in self.fun_names:
 | 
						|
                if name == 'proc_memory_info' and p.pid == os.getpid():
 | 
						|
                    continue
 | 
						|
                if name == 'proc_create_time' and p.pid in (0, 4):
 | 
						|
                    continue
 | 
						|
                meth = wrap_exceptions(getattr(cext, name))
 | 
						|
                try:
 | 
						|
                    ret = meth(p.pid)
 | 
						|
                except (psutil.NoSuchProcess, psutil.AccessDenied):
 | 
						|
                    continue
 | 
						|
                # compare values
 | 
						|
                try:
 | 
						|
                    if name == 'proc_cpu_times':
 | 
						|
                        compare_with_tolerance(ret[0], nt.user_time, tolerance)
 | 
						|
                        compare_with_tolerance(ret[1],
 | 
						|
                                               nt.kernel_time, tolerance)
 | 
						|
                    elif name == 'proc_create_time':
 | 
						|
                        compare_with_tolerance(ret, nt.create_time, tolerance)
 | 
						|
                    elif name == 'proc_num_handles':
 | 
						|
                        compare_with_tolerance(ret, nt.num_handles, tolerance)
 | 
						|
                    elif name == 'proc_io_counters':
 | 
						|
                        compare_with_tolerance(ret[0], nt.io_rcount, tolerance)
 | 
						|
                        compare_with_tolerance(ret[1], nt.io_wcount, tolerance)
 | 
						|
                        compare_with_tolerance(ret[2], nt.io_rbytes, tolerance)
 | 
						|
                        compare_with_tolerance(ret[3], nt.io_wbytes, tolerance)
 | 
						|
                    elif name == 'proc_memory_info':
 | 
						|
                        try:
 | 
						|
                            rawtupl = cext.proc_memory_info_2(p.pid)
 | 
						|
                        except psutil.NoSuchProcess:
 | 
						|
                            continue
 | 
						|
                        compare_with_tolerance(ret, rawtupl, tolerance)
 | 
						|
                except AssertionError:
 | 
						|
                    trace = traceback.format_exc()
 | 
						|
                    msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' % (
 | 
						|
                        trace, p.pid, name, ret, nt)
 | 
						|
                    failures.append(msg)
 | 
						|
                    break
 | 
						|
 | 
						|
        if failures:
 | 
						|
            self.fail('\n\n'.join(failures))
 | 
						|
 | 
						|
    # ---
 | 
						|
    # same tests as above but mimicks the AccessDenied failure of
 | 
						|
    # the first (fast) method failing with AD.
 | 
						|
    # TODO: currently does not take tolerance into account.
 | 
						|
 | 
						|
    def test_name(self):
 | 
						|
        name = psutil.Process().name()
 | 
						|
        with mock.patch("psutil._psplatform.cext.proc_exe",
 | 
						|
                        side_effect=psutil.AccessDenied(os.getpid())) as fun:
 | 
						|
            psutil.Process().name() == name
 | 
						|
            assert fun.called
 | 
						|
 | 
						|
    def test_memory_info(self):
 | 
						|
        mem = psutil.Process().memory_info()
 | 
						|
        with mock.patch("psutil._psplatform.cext.proc_memory_info",
 | 
						|
                        side_effect=OSError(errno.EPERM, "msg")) as fun:
 | 
						|
            psutil.Process().memory_info() == mem
 | 
						|
            assert fun.called
 | 
						|
 | 
						|
    def test_create_time(self):
 | 
						|
        ctime = psutil.Process().create_time()
 | 
						|
        with mock.patch("psutil._psplatform.cext.proc_create_time",
 | 
						|
                        side_effect=OSError(errno.EPERM, "msg")) as fun:
 | 
						|
            psutil.Process().create_time() == ctime
 | 
						|
            assert fun.called
 | 
						|
 | 
						|
    def test_cpu_times(self):
 | 
						|
        cpu_times = psutil.Process().cpu_times()
 | 
						|
        with mock.patch("psutil._psplatform.cext.proc_cpu_times",
 | 
						|
                        side_effect=OSError(errno.EPERM, "msg")) as fun:
 | 
						|
            psutil.Process().cpu_times() == cpu_times
 | 
						|
            assert fun.called
 | 
						|
 | 
						|
    def test_io_counters(self):
 | 
						|
        io_counters = psutil.Process().io_counters()
 | 
						|
        with mock.patch("psutil._psplatform.cext.proc_io_counters",
 | 
						|
                        side_effect=OSError(errno.EPERM, "msg")) as fun:
 | 
						|
            psutil.Process().io_counters() == io_counters
 | 
						|
            assert fun.called
 | 
						|
 | 
						|
    def test_num_handles(self):
 | 
						|
        io_counters = psutil.Process().io_counters()
 | 
						|
        with mock.patch("psutil._psplatform.cext.proc_io_counters",
 | 
						|
                        side_effect=OSError(errno.EPERM, "msg")) as fun:
 | 
						|
            psutil.Process().io_counters() == io_counters
 | 
						|
            assert fun.called
 | 
						|
 | 
						|
    # --- other tests
 | 
						|
 | 
						|
    def test_compare_name_exe(self):
 | 
						|
        for p in psutil.process_iter():
 | 
						|
            try:
 | 
						|
                a = os.path.basename(p.exe())
 | 
						|
                b = p.name()
 | 
						|
            except (psutil.NoSuchProcess, psutil.AccessDenied):
 | 
						|
                pass
 | 
						|
            else:
 | 
						|
                self.assertEqual(a, b)
 | 
						|
 | 
						|
    def test_zombies(self):
 | 
						|
        # test that NPS is raised by the 2nd implementation in case a
 | 
						|
        # process no longer exists
 | 
						|
        ZOMBIE_PID = max(psutil.pids()) + 5000
 | 
						|
        for name, _ in self.fun_names:
 | 
						|
            meth = wrap_exceptions(getattr(cext, name))
 | 
						|
            self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID)
 | 
						|
 | 
						|
 | 
						|
def main():
 | 
						|
    test_suite = unittest.TestSuite()
 | 
						|
    test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase))
 | 
						|
    test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation))
 | 
						|
    result = unittest.TextTestRunner(verbosity=2).run(test_suite)
 | 
						|
    return result.wasSuccessful()
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    if not main():
 | 
						|
        sys.exit(1)
 |