forked from mirrors/gecko-dev
Backed out changeset 19f707f5c097 (bug 1666347) Backed out changeset 3732ee259759 (bug 1666345) Backed out changeset 353d3c9e74b9 (bug 1661624) Backed out changeset a651515586a8 (bug 1667152)
904 lines
35 KiB
Python
904 lines
35 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""Tests for system APIS."""
|
|
|
|
import contextlib
|
|
import datetime
|
|
import errno
|
|
import os
|
|
import pprint
|
|
import shutil
|
|
import signal
|
|
import socket
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
|
|
import psutil
|
|
from psutil import AIX
|
|
from psutil import BSD
|
|
from psutil import FREEBSD
|
|
from psutil import LINUX
|
|
from psutil import MACOS
|
|
from psutil import NETBSD
|
|
from psutil import OPENBSD
|
|
from psutil import POSIX
|
|
from psutil import SUNOS
|
|
from psutil import WINDOWS
|
|
from psutil._compat import FileNotFoundError
|
|
from psutil._compat import long
|
|
from psutil.tests import ASCII_FS
|
|
from psutil.tests import check_net_address
|
|
from psutil.tests import CI_TESTING
|
|
from psutil.tests import DEVNULL
|
|
from psutil.tests import enum
|
|
from psutil.tests import get_test_subprocess
|
|
from psutil.tests import HAS_BATTERY
|
|
from psutil.tests import HAS_CPU_FREQ
|
|
from psutil.tests import HAS_GETLOADAVG
|
|
from psutil.tests import HAS_NET_IO_COUNTERS
|
|
from psutil.tests import HAS_SENSORS_BATTERY
|
|
from psutil.tests import HAS_SENSORS_FANS
|
|
from psutil.tests import HAS_SENSORS_TEMPERATURES
|
|
from psutil.tests import mock
|
|
from psutil.tests import PYPY
|
|
from psutil.tests import reap_children
|
|
from psutil.tests import retry_on_failure
|
|
from psutil.tests import safe_rmpath
|
|
from psutil.tests import TESTFN
|
|
from psutil.tests import TESTFN_UNICODE
|
|
from psutil.tests import TRAVIS
|
|
from psutil.tests import unittest
|
|
|
|
|
|
# ===================================================================
|
|
# --- System-related API tests
|
|
# ===================================================================
|
|
|
|
|
|
class TestProcessAPIs(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
safe_rmpath(TESTFN)
|
|
|
|
def tearDown(self):
|
|
reap_children()
|
|
|
|
def test_process_iter(self):
|
|
self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()])
|
|
sproc = get_test_subprocess()
|
|
self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()])
|
|
p = psutil.Process(sproc.pid)
|
|
p.kill()
|
|
p.wait()
|
|
self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()])
|
|
|
|
with mock.patch('psutil.Process',
|
|
side_effect=psutil.NoSuchProcess(os.getpid())):
|
|
self.assertEqual(list(psutil.process_iter()), [])
|
|
with mock.patch('psutil.Process',
|
|
side_effect=psutil.AccessDenied(os.getpid())):
|
|
with self.assertRaises(psutil.AccessDenied):
|
|
list(psutil.process_iter())
|
|
|
|
def test_prcess_iter_w_attrs(self):
|
|
for p in psutil.process_iter(attrs=['pid']):
|
|
self.assertEqual(list(p.info.keys()), ['pid'])
|
|
with self.assertRaises(ValueError):
|
|
list(psutil.process_iter(attrs=['foo']))
|
|
with mock.patch("psutil._psplatform.Process.cpu_times",
|
|
side_effect=psutil.AccessDenied(0, "")) as m:
|
|
for p in psutil.process_iter(attrs=["pid", "cpu_times"]):
|
|
self.assertIsNone(p.info['cpu_times'])
|
|
self.assertGreaterEqual(p.info['pid'], 0)
|
|
assert m.called
|
|
with mock.patch("psutil._psplatform.Process.cpu_times",
|
|
side_effect=psutil.AccessDenied(0, "")) as m:
|
|
flag = object()
|
|
for p in psutil.process_iter(
|
|
attrs=["pid", "cpu_times"], ad_value=flag):
|
|
self.assertIs(p.info['cpu_times'], flag)
|
|
self.assertGreaterEqual(p.info['pid'], 0)
|
|
assert m.called
|
|
|
|
@unittest.skipIf(PYPY and WINDOWS,
|
|
"get_test_subprocess() unreliable on PYPY + WINDOWS")
|
|
def test_wait_procs(self):
|
|
def callback(p):
|
|
pids.append(p.pid)
|
|
|
|
pids = []
|
|
sproc1 = get_test_subprocess()
|
|
sproc2 = get_test_subprocess()
|
|
sproc3 = get_test_subprocess()
|
|
procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
|
|
self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1)
|
|
self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1)
|
|
t = time.time()
|
|
gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback)
|
|
|
|
self.assertLess(time.time() - t, 0.5)
|
|
self.assertEqual(gone, [])
|
|
self.assertEqual(len(alive), 3)
|
|
self.assertEqual(pids, [])
|
|
for p in alive:
|
|
self.assertFalse(hasattr(p, 'returncode'))
|
|
|
|
@retry_on_failure(30)
|
|
def test(procs, callback):
|
|
gone, alive = psutil.wait_procs(procs, timeout=0.03,
|
|
callback=callback)
|
|
self.assertEqual(len(gone), 1)
|
|
self.assertEqual(len(alive), 2)
|
|
return gone, alive
|
|
|
|
sproc3.terminate()
|
|
gone, alive = test(procs, callback)
|
|
self.assertIn(sproc3.pid, [x.pid for x in gone])
|
|
if POSIX:
|
|
self.assertEqual(gone.pop().returncode, -signal.SIGTERM)
|
|
else:
|
|
self.assertEqual(gone.pop().returncode, 1)
|
|
self.assertEqual(pids, [sproc3.pid])
|
|
for p in alive:
|
|
self.assertFalse(hasattr(p, 'returncode'))
|
|
|
|
@retry_on_failure(30)
|
|
def test(procs, callback):
|
|
gone, alive = psutil.wait_procs(procs, timeout=0.03,
|
|
callback=callback)
|
|
self.assertEqual(len(gone), 3)
|
|
self.assertEqual(len(alive), 0)
|
|
return gone, alive
|
|
|
|
sproc1.terminate()
|
|
sproc2.terminate()
|
|
gone, alive = test(procs, callback)
|
|
self.assertEqual(set(pids), set([sproc1.pid, sproc2.pid, sproc3.pid]))
|
|
for p in gone:
|
|
self.assertTrue(hasattr(p, 'returncode'))
|
|
|
|
@unittest.skipIf(PYPY and WINDOWS,
|
|
"get_test_subprocess() unreliable on PYPY + WINDOWS")
|
|
def test_wait_procs_no_timeout(self):
|
|
sproc1 = get_test_subprocess()
|
|
sproc2 = get_test_subprocess()
|
|
sproc3 = get_test_subprocess()
|
|
procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
|
|
for p in procs:
|
|
p.terminate()
|
|
gone, alive = psutil.wait_procs(procs)
|
|
|
|
def test_pid_exists(self):
|
|
sproc = get_test_subprocess()
|
|
self.assertTrue(psutil.pid_exists(sproc.pid))
|
|
p = psutil.Process(sproc.pid)
|
|
p.kill()
|
|
p.wait()
|
|
self.assertFalse(psutil.pid_exists(sproc.pid))
|
|
self.assertFalse(psutil.pid_exists(-1))
|
|
self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids())
|
|
|
|
def test_pid_exists_2(self):
|
|
reap_children()
|
|
pids = psutil.pids()
|
|
for pid in pids:
|
|
try:
|
|
assert psutil.pid_exists(pid)
|
|
except AssertionError:
|
|
# in case the process disappeared in meantime fail only
|
|
# if it is no longer in psutil.pids()
|
|
time.sleep(.1)
|
|
if pid in psutil.pids():
|
|
self.fail(pid)
|
|
pids = range(max(pids) + 5000, max(pids) + 6000)
|
|
for pid in pids:
|
|
self.assertFalse(psutil.pid_exists(pid), msg=pid)
|
|
|
|
def test_pids(self):
|
|
pidslist = psutil.pids()
|
|
procslist = [x.pid for x in psutil.process_iter()]
|
|
# make sure every pid is unique
|
|
self.assertEqual(sorted(set(pidslist)), pidslist)
|
|
self.assertEqual(pidslist, procslist)
|
|
|
|
|
|
class TestMiscAPIs(unittest.TestCase):
|
|
|
|
def test_boot_time(self):
|
|
bt = psutil.boot_time()
|
|
self.assertIsInstance(bt, float)
|
|
self.assertGreater(bt, 0)
|
|
self.assertLess(bt, time.time())
|
|
|
|
@unittest.skipIf(CI_TESTING and not psutil.users(), "unreliable on CI")
|
|
def test_users(self):
|
|
users = psutil.users()
|
|
self.assertNotEqual(users, [])
|
|
for user in users:
|
|
assert user.name, user
|
|
self.assertIsInstance(user.name, str)
|
|
self.assertIsInstance(user.terminal, (str, type(None)))
|
|
if user.host is not None:
|
|
self.assertIsInstance(user.host, (str, type(None)))
|
|
user.terminal
|
|
user.host
|
|
assert user.started > 0.0, user
|
|
datetime.datetime.fromtimestamp(user.started)
|
|
if WINDOWS or OPENBSD:
|
|
self.assertIsNone(user.pid)
|
|
else:
|
|
psutil.Process(user.pid)
|
|
|
|
@unittest.skipIf(not POSIX, 'POSIX only')
|
|
def test_PAGESIZE(self):
|
|
# pagesize is used internally to perform different calculations
|
|
# and it's determined by using SC_PAGE_SIZE; make sure
|
|
# getpagesize() returns the same value.
|
|
import resource
|
|
self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize())
|
|
|
|
def test_test(self):
|
|
# test for psutil.test() function
|
|
stdout = sys.stdout
|
|
sys.stdout = DEVNULL
|
|
try:
|
|
psutil.test()
|
|
finally:
|
|
sys.stdout = stdout
|
|
|
|
def test_os_constants(self):
|
|
names = ["POSIX", "WINDOWS", "LINUX", "MACOS", "FREEBSD", "OPENBSD",
|
|
"NETBSD", "BSD", "SUNOS"]
|
|
for name in names:
|
|
self.assertIsInstance(getattr(psutil, name), bool, msg=name)
|
|
|
|
if os.name == 'posix':
|
|
assert psutil.POSIX
|
|
assert not psutil.WINDOWS
|
|
names.remove("POSIX")
|
|
if "linux" in sys.platform.lower():
|
|
assert psutil.LINUX
|
|
names.remove("LINUX")
|
|
elif "bsd" in sys.platform.lower():
|
|
assert psutil.BSD
|
|
self.assertEqual([psutil.FREEBSD, psutil.OPENBSD,
|
|
psutil.NETBSD].count(True), 1)
|
|
names.remove("BSD")
|
|
names.remove("FREEBSD")
|
|
names.remove("OPENBSD")
|
|
names.remove("NETBSD")
|
|
elif "sunos" in sys.platform.lower() or \
|
|
"solaris" in sys.platform.lower():
|
|
assert psutil.SUNOS
|
|
names.remove("SUNOS")
|
|
elif "darwin" in sys.platform.lower():
|
|
assert psutil.MACOS
|
|
names.remove("MACOS")
|
|
else:
|
|
assert psutil.WINDOWS
|
|
assert not psutil.POSIX
|
|
names.remove("WINDOWS")
|
|
|
|
# assert all other constants are set to False
|
|
for name in names:
|
|
self.assertIs(getattr(psutil, name), False, msg=name)
|
|
|
|
|
|
class TestMemoryAPIs(unittest.TestCase):
|
|
|
|
def test_virtual_memory(self):
|
|
mem = psutil.virtual_memory()
|
|
assert mem.total > 0, mem
|
|
assert mem.available > 0, mem
|
|
assert 0 <= mem.percent <= 100, mem
|
|
assert mem.used > 0, mem
|
|
assert mem.free >= 0, mem
|
|
for name in mem._fields:
|
|
value = getattr(mem, name)
|
|
if name != 'percent':
|
|
self.assertIsInstance(value, (int, long))
|
|
if name != 'total':
|
|
if not value >= 0:
|
|
self.fail("%r < 0 (%s)" % (name, value))
|
|
if value > mem.total:
|
|
self.fail("%r > total (total=%s, %s=%s)"
|
|
% (name, mem.total, name, value))
|
|
|
|
def test_swap_memory(self):
|
|
mem = psutil.swap_memory()
|
|
self.assertEqual(
|
|
mem._fields, ('total', 'used', 'free', 'percent', 'sin', 'sout'))
|
|
|
|
assert mem.total >= 0, mem
|
|
assert mem.used >= 0, mem
|
|
if mem.total > 0:
|
|
# likely a system with no swap partition
|
|
assert mem.free > 0, mem
|
|
else:
|
|
assert mem.free == 0, mem
|
|
assert 0 <= mem.percent <= 100, mem
|
|
assert mem.sin >= 0, mem
|
|
assert mem.sout >= 0, mem
|
|
|
|
|
|
class TestCpuAPIs(unittest.TestCase):
|
|
|
|
def test_cpu_count_logical(self):
|
|
logical = psutil.cpu_count()
|
|
self.assertIsNotNone(logical)
|
|
self.assertEqual(logical, len(psutil.cpu_times(percpu=True)))
|
|
self.assertGreaterEqual(logical, 1)
|
|
#
|
|
if os.path.exists("/proc/cpuinfo"):
|
|
with open("/proc/cpuinfo") as fd:
|
|
cpuinfo_data = fd.read()
|
|
if "physical id" not in cpuinfo_data:
|
|
raise unittest.SkipTest("cpuinfo doesn't include physical id")
|
|
|
|
def test_cpu_count_physical(self):
|
|
logical = psutil.cpu_count()
|
|
physical = psutil.cpu_count(logical=False)
|
|
if physical is None:
|
|
raise self.skipTest("physical cpu_count() is None")
|
|
if WINDOWS and sys.getwindowsversion()[:2] <= (6, 1): # <= Vista
|
|
self.assertIsNone(physical)
|
|
else:
|
|
self.assertGreaterEqual(physical, 1)
|
|
self.assertGreaterEqual(logical, physical)
|
|
|
|
def test_cpu_count_none(self):
|
|
# https://github.com/giampaolo/psutil/issues/1085
|
|
for val in (-1, 0, None):
|
|
with mock.patch('psutil._psplatform.cpu_count_logical',
|
|
return_value=val) as m:
|
|
self.assertIsNone(psutil.cpu_count())
|
|
assert m.called
|
|
with mock.patch('psutil._psplatform.cpu_count_physical',
|
|
return_value=val) as m:
|
|
self.assertIsNone(psutil.cpu_count(logical=False))
|
|
assert m.called
|
|
|
|
def test_cpu_times(self):
|
|
# Check type, value >= 0, str().
|
|
total = 0
|
|
times = psutil.cpu_times()
|
|
sum(times)
|
|
for cp_time in times:
|
|
self.assertIsInstance(cp_time, float)
|
|
self.assertGreaterEqual(cp_time, 0.0)
|
|
total += cp_time
|
|
self.assertEqual(total, sum(times))
|
|
str(times)
|
|
# CPU times are always supposed to increase over time
|
|
# or at least remain the same and that's because time
|
|
# cannot go backwards.
|
|
# Surprisingly sometimes this might not be the case (at
|
|
# least on Windows and Linux), see:
|
|
# https://github.com/giampaolo/psutil/issues/392
|
|
# https://github.com/giampaolo/psutil/issues/645
|
|
# if not WINDOWS:
|
|
# last = psutil.cpu_times()
|
|
# for x in range(100):
|
|
# new = psutil.cpu_times()
|
|
# for field in new._fields:
|
|
# new_t = getattr(new, field)
|
|
# last_t = getattr(last, field)
|
|
# self.assertGreaterEqual(new_t, last_t,
|
|
# msg="%s %s" % (new_t, last_t))
|
|
# last = new
|
|
|
|
def test_cpu_times_time_increases(self):
|
|
# Make sure time increases between calls.
|
|
t1 = sum(psutil.cpu_times())
|
|
stop_at = time.time() + 1
|
|
while time.time() < stop_at:
|
|
t2 = sum(psutil.cpu_times())
|
|
if t2 > t1:
|
|
return
|
|
self.fail("time remained the same")
|
|
|
|
def test_per_cpu_times(self):
|
|
# Check type, value >= 0, str().
|
|
for times in psutil.cpu_times(percpu=True):
|
|
total = 0
|
|
sum(times)
|
|
for cp_time in times:
|
|
self.assertIsInstance(cp_time, float)
|
|
self.assertGreaterEqual(cp_time, 0.0)
|
|
total += cp_time
|
|
self.assertEqual(total, sum(times))
|
|
str(times)
|
|
self.assertEqual(len(psutil.cpu_times(percpu=True)[0]),
|
|
len(psutil.cpu_times(percpu=False)))
|
|
|
|
# Note: in theory CPU times are always supposed to increase over
|
|
# time or remain the same but never go backwards. In practice
|
|
# sometimes this is not the case.
|
|
# This issue seemd to be afflict Windows:
|
|
# https://github.com/giampaolo/psutil/issues/392
|
|
# ...but it turns out also Linux (rarely) behaves the same.
|
|
# last = psutil.cpu_times(percpu=True)
|
|
# for x in range(100):
|
|
# new = psutil.cpu_times(percpu=True)
|
|
# for index in range(len(new)):
|
|
# newcpu = new[index]
|
|
# lastcpu = last[index]
|
|
# for field in newcpu._fields:
|
|
# new_t = getattr(newcpu, field)
|
|
# last_t = getattr(lastcpu, field)
|
|
# self.assertGreaterEqual(
|
|
# new_t, last_t, msg="%s %s" % (lastcpu, newcpu))
|
|
# last = new
|
|
|
|
def test_per_cpu_times_2(self):
|
|
# Simulate some work load then make sure time have increased
|
|
# between calls.
|
|
tot1 = psutil.cpu_times(percpu=True)
|
|
giveup_at = time.time() + 1
|
|
while True:
|
|
if time.time() >= giveup_at:
|
|
return self.fail("timeout")
|
|
tot2 = psutil.cpu_times(percpu=True)
|
|
for t1, t2 in zip(tot1, tot2):
|
|
t1, t2 = psutil._cpu_busy_time(t1), psutil._cpu_busy_time(t2)
|
|
difference = t2 - t1
|
|
if difference >= 0.05:
|
|
return
|
|
|
|
def test_cpu_times_comparison(self):
|
|
# Make sure the sum of all per cpu times is almost equal to
|
|
# base "one cpu" times.
|
|
base = psutil.cpu_times()
|
|
per_cpu = psutil.cpu_times(percpu=True)
|
|
summed_values = base._make([sum(num) for num in zip(*per_cpu)])
|
|
for field in base._fields:
|
|
self.assertAlmostEqual(
|
|
getattr(base, field), getattr(summed_values, field), delta=1)
|
|
|
|
def _test_cpu_percent(self, percent, last_ret, new_ret):
|
|
try:
|
|
self.assertIsInstance(percent, float)
|
|
self.assertGreaterEqual(percent, 0.0)
|
|
self.assertIsNot(percent, -0.0)
|
|
self.assertLessEqual(percent, 100.0 * psutil.cpu_count())
|
|
except AssertionError as err:
|
|
raise AssertionError("\n%s\nlast=%s\nnew=%s" % (
|
|
err, pprint.pformat(last_ret), pprint.pformat(new_ret)))
|
|
|
|
def test_cpu_percent(self):
|
|
last = psutil.cpu_percent(interval=0.001)
|
|
for x in range(100):
|
|
new = psutil.cpu_percent(interval=None)
|
|
self._test_cpu_percent(new, last, new)
|
|
last = new
|
|
with self.assertRaises(ValueError):
|
|
psutil.cpu_percent(interval=-1)
|
|
|
|
def test_per_cpu_percent(self):
|
|
last = psutil.cpu_percent(interval=0.001, percpu=True)
|
|
self.assertEqual(len(last), psutil.cpu_count())
|
|
for x in range(100):
|
|
new = psutil.cpu_percent(interval=None, percpu=True)
|
|
for percent in new:
|
|
self._test_cpu_percent(percent, last, new)
|
|
last = new
|
|
with self.assertRaises(ValueError):
|
|
psutil.cpu_percent(interval=-1, percpu=True)
|
|
|
|
def test_cpu_times_percent(self):
|
|
last = psutil.cpu_times_percent(interval=0.001)
|
|
for x in range(100):
|
|
new = psutil.cpu_times_percent(interval=None)
|
|
for percent in new:
|
|
self._test_cpu_percent(percent, last, new)
|
|
self._test_cpu_percent(sum(new), last, new)
|
|
last = new
|
|
|
|
def test_per_cpu_times_percent(self):
|
|
last = psutil.cpu_times_percent(interval=0.001, percpu=True)
|
|
self.assertEqual(len(last), psutil.cpu_count())
|
|
for x in range(100):
|
|
new = psutil.cpu_times_percent(interval=None, percpu=True)
|
|
for cpu in new:
|
|
for percent in cpu:
|
|
self._test_cpu_percent(percent, last, new)
|
|
self._test_cpu_percent(sum(cpu), last, new)
|
|
last = new
|
|
|
|
def test_per_cpu_times_percent_negative(self):
|
|
# see: https://github.com/giampaolo/psutil/issues/645
|
|
psutil.cpu_times_percent(percpu=True)
|
|
zero_times = [x._make([0 for x in range(len(x._fields))])
|
|
for x in psutil.cpu_times(percpu=True)]
|
|
with mock.patch('psutil.cpu_times', return_value=zero_times):
|
|
for cpu in psutil.cpu_times_percent(percpu=True):
|
|
for percent in cpu:
|
|
self._test_cpu_percent(percent, None, None)
|
|
|
|
def test_cpu_stats(self):
|
|
# Tested more extensively in per-platform test modules.
|
|
infos = psutil.cpu_stats()
|
|
self.assertEqual(
|
|
infos._fields,
|
|
('ctx_switches', 'interrupts', 'soft_interrupts', 'syscalls'))
|
|
for name in infos._fields:
|
|
value = getattr(infos, name)
|
|
self.assertGreaterEqual(value, 0)
|
|
# on AIX, ctx_switches is always 0
|
|
if not AIX and name in ('ctx_switches', 'interrupts'):
|
|
self.assertGreater(value, 0)
|
|
|
|
@unittest.skipIf(not HAS_CPU_FREQ, "not suported")
|
|
def test_cpu_freq(self):
|
|
def check_ls(ls):
|
|
for nt in ls:
|
|
self.assertEqual(nt._fields, ('current', 'min', 'max'))
|
|
if nt.max != 0.0:
|
|
self.assertLessEqual(nt.current, nt.max)
|
|
for name in nt._fields:
|
|
value = getattr(nt, name)
|
|
self.assertIsInstance(value, (int, long, float))
|
|
self.assertGreaterEqual(value, 0)
|
|
|
|
ls = psutil.cpu_freq(percpu=True)
|
|
if TRAVIS and not ls:
|
|
raise self.skipTest("skipped on Travis")
|
|
if FREEBSD and not ls:
|
|
raise self.skipTest("returns empty list on FreeBSD")
|
|
|
|
assert ls, ls
|
|
check_ls([psutil.cpu_freq(percpu=False)])
|
|
|
|
if LINUX:
|
|
self.assertEqual(len(ls), psutil.cpu_count())
|
|
|
|
@unittest.skipIf(not HAS_GETLOADAVG, "not supported")
|
|
def test_getloadavg(self):
|
|
loadavg = psutil.getloadavg()
|
|
assert len(loadavg) == 3
|
|
|
|
for load in loadavg:
|
|
self.assertIsInstance(load, float)
|
|
self.assertGreaterEqual(load, 0.0)
|
|
|
|
|
|
class TestDiskAPIs(unittest.TestCase):
|
|
|
|
def test_disk_usage(self):
|
|
usage = psutil.disk_usage(os.getcwd())
|
|
self.assertEqual(usage._fields, ('total', 'used', 'free', 'percent'))
|
|
|
|
assert usage.total > 0, usage
|
|
assert usage.used > 0, usage
|
|
assert usage.free > 0, usage
|
|
assert usage.total > usage.used, usage
|
|
assert usage.total > usage.free, usage
|
|
assert 0 <= usage.percent <= 100, usage.percent
|
|
if hasattr(shutil, 'disk_usage'):
|
|
# py >= 3.3, see: http://bugs.python.org/issue12442
|
|
shutil_usage = shutil.disk_usage(os.getcwd())
|
|
tolerance = 5 * 1024 * 1024 # 5MB
|
|
self.assertEqual(usage.total, shutil_usage.total)
|
|
self.assertAlmostEqual(usage.free, shutil_usage.free,
|
|
delta=tolerance)
|
|
self.assertAlmostEqual(usage.used, shutil_usage.used,
|
|
delta=tolerance)
|
|
|
|
# if path does not exist OSError ENOENT is expected across
|
|
# all platforms
|
|
fname = tempfile.mktemp()
|
|
with self.assertRaises(FileNotFoundError):
|
|
psutil.disk_usage(fname)
|
|
|
|
def test_disk_usage_unicode(self):
|
|
# See: https://github.com/giampaolo/psutil/issues/416
|
|
if ASCII_FS:
|
|
with self.assertRaises(UnicodeEncodeError):
|
|
psutil.disk_usage(TESTFN_UNICODE)
|
|
|
|
def test_disk_usage_bytes(self):
|
|
psutil.disk_usage(b'.')
|
|
|
|
def test_disk_partitions(self):
|
|
# all = False
|
|
ls = psutil.disk_partitions(all=False)
|
|
# on travis we get:
|
|
# self.assertEqual(p.cpu_affinity(), [n])
|
|
# AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0]
|
|
self.assertTrue(ls, msg=ls)
|
|
for disk in ls:
|
|
self.assertIsInstance(disk.device, str)
|
|
self.assertIsInstance(disk.mountpoint, str)
|
|
self.assertIsInstance(disk.fstype, str)
|
|
self.assertIsInstance(disk.opts, str)
|
|
if WINDOWS and 'cdrom' in disk.opts:
|
|
continue
|
|
if not POSIX:
|
|
assert os.path.exists(disk.device), disk
|
|
else:
|
|
# we cannot make any assumption about this, see:
|
|
# http://goo.gl/p9c43
|
|
disk.device
|
|
# on modern systems mount points can also be files
|
|
assert os.path.exists(disk.mountpoint), disk
|
|
assert disk.fstype, disk
|
|
|
|
# all = True
|
|
ls = psutil.disk_partitions(all=True)
|
|
self.assertTrue(ls, msg=ls)
|
|
for disk in psutil.disk_partitions(all=True):
|
|
if not WINDOWS and disk.mountpoint:
|
|
try:
|
|
os.stat(disk.mountpoint)
|
|
except OSError as err:
|
|
if TRAVIS and MACOS and err.errno == errno.EIO:
|
|
continue
|
|
# http://mail.python.org/pipermail/python-dev/
|
|
# 2012-June/120787.html
|
|
if err.errno not in (errno.EPERM, errno.EACCES):
|
|
raise
|
|
else:
|
|
assert os.path.exists(disk.mountpoint), disk
|
|
self.assertIsInstance(disk.fstype, str)
|
|
self.assertIsInstance(disk.opts, str)
|
|
|
|
def find_mount_point(path):
|
|
path = os.path.abspath(path)
|
|
while not os.path.ismount(path):
|
|
path = os.path.dirname(path)
|
|
return path.lower()
|
|
|
|
mount = find_mount_point(__file__)
|
|
mounts = [x.mountpoint.lower() for x in
|
|
psutil.disk_partitions(all=True) if x.mountpoint]
|
|
self.assertIn(mount, mounts)
|
|
psutil.disk_usage(mount)
|
|
|
|
@unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'),
|
|
'/proc/diskstats not available on this linux version')
|
|
@unittest.skipIf(CI_TESTING and not psutil.disk_io_counters(),
|
|
"unreliable on CI") # no visible disks
|
|
def test_disk_io_counters(self):
|
|
def check_ntuple(nt):
|
|
self.assertEqual(nt[0], nt.read_count)
|
|
self.assertEqual(nt[1], nt.write_count)
|
|
self.assertEqual(nt[2], nt.read_bytes)
|
|
self.assertEqual(nt[3], nt.write_bytes)
|
|
if not (OPENBSD or NETBSD):
|
|
self.assertEqual(nt[4], nt.read_time)
|
|
self.assertEqual(nt[5], nt.write_time)
|
|
if LINUX:
|
|
self.assertEqual(nt[6], nt.read_merged_count)
|
|
self.assertEqual(nt[7], nt.write_merged_count)
|
|
self.assertEqual(nt[8], nt.busy_time)
|
|
elif FREEBSD:
|
|
self.assertEqual(nt[6], nt.busy_time)
|
|
for name in nt._fields:
|
|
assert getattr(nt, name) >= 0, nt
|
|
|
|
ret = psutil.disk_io_counters(perdisk=False)
|
|
assert ret is not None, "no disks on this system?"
|
|
check_ntuple(ret)
|
|
ret = psutil.disk_io_counters(perdisk=True)
|
|
# make sure there are no duplicates
|
|
self.assertEqual(len(ret), len(set(ret)))
|
|
for key in ret:
|
|
assert key, key
|
|
check_ntuple(ret[key])
|
|
|
|
def test_disk_io_counters_no_disks(self):
|
|
# Emulate a case where no disks are installed, see:
|
|
# https://github.com/giampaolo/psutil/issues/1062
|
|
with mock.patch('psutil._psplatform.disk_io_counters',
|
|
return_value={}) as m:
|
|
self.assertIsNone(psutil.disk_io_counters(perdisk=False))
|
|
self.assertEqual(psutil.disk_io_counters(perdisk=True), {})
|
|
assert m.called
|
|
|
|
|
|
class TestNetAPIs(unittest.TestCase):
|
|
|
|
@unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported')
|
|
def test_net_io_counters(self):
|
|
def check_ntuple(nt):
|
|
self.assertEqual(nt[0], nt.bytes_sent)
|
|
self.assertEqual(nt[1], nt.bytes_recv)
|
|
self.assertEqual(nt[2], nt.packets_sent)
|
|
self.assertEqual(nt[3], nt.packets_recv)
|
|
self.assertEqual(nt[4], nt.errin)
|
|
self.assertEqual(nt[5], nt.errout)
|
|
self.assertEqual(nt[6], nt.dropin)
|
|
self.assertEqual(nt[7], nt.dropout)
|
|
assert nt.bytes_sent >= 0, nt
|
|
assert nt.bytes_recv >= 0, nt
|
|
assert nt.packets_sent >= 0, nt
|
|
assert nt.packets_recv >= 0, nt
|
|
assert nt.errin >= 0, nt
|
|
assert nt.errout >= 0, nt
|
|
assert nt.dropin >= 0, nt
|
|
assert nt.dropout >= 0, nt
|
|
|
|
ret = psutil.net_io_counters(pernic=False)
|
|
check_ntuple(ret)
|
|
ret = psutil.net_io_counters(pernic=True)
|
|
self.assertNotEqual(ret, [])
|
|
for key in ret:
|
|
self.assertTrue(key)
|
|
self.assertIsInstance(key, str)
|
|
check_ntuple(ret[key])
|
|
|
|
@unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported')
|
|
def test_net_io_counters_no_nics(self):
|
|
# Emulate a case where no NICs are installed, see:
|
|
# https://github.com/giampaolo/psutil/issues/1062
|
|
with mock.patch('psutil._psplatform.net_io_counters',
|
|
return_value={}) as m:
|
|
self.assertIsNone(psutil.net_io_counters(pernic=False))
|
|
self.assertEqual(psutil.net_io_counters(pernic=True), {})
|
|
assert m.called
|
|
|
|
def test_net_if_addrs(self):
|
|
nics = psutil.net_if_addrs()
|
|
assert nics, nics
|
|
|
|
nic_stats = psutil.net_if_stats()
|
|
|
|
# Not reliable on all platforms (net_if_addrs() reports more
|
|
# interfaces).
|
|
# self.assertEqual(sorted(nics.keys()),
|
|
# sorted(psutil.net_io_counters(pernic=True).keys()))
|
|
|
|
families = set([socket.AF_INET, socket.AF_INET6, psutil.AF_LINK])
|
|
for nic, addrs in nics.items():
|
|
self.assertIsInstance(nic, str)
|
|
self.assertEqual(len(set(addrs)), len(addrs))
|
|
for addr in addrs:
|
|
self.assertIsInstance(addr.family, int)
|
|
self.assertIsInstance(addr.address, str)
|
|
self.assertIsInstance(addr.netmask, (str, type(None)))
|
|
self.assertIsInstance(addr.broadcast, (str, type(None)))
|
|
self.assertIn(addr.family, families)
|
|
if sys.version_info >= (3, 4):
|
|
self.assertIsInstance(addr.family, enum.IntEnum)
|
|
if nic_stats[nic].isup:
|
|
# Do not test binding to addresses of interfaces
|
|
# that are down
|
|
if addr.family == socket.AF_INET:
|
|
s = socket.socket(addr.family)
|
|
with contextlib.closing(s):
|
|
s.bind((addr.address, 0))
|
|
elif addr.family == socket.AF_INET6:
|
|
info = socket.getaddrinfo(
|
|
addr.address, 0, socket.AF_INET6,
|
|
socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0]
|
|
af, socktype, proto, canonname, sa = info
|
|
s = socket.socket(af, socktype, proto)
|
|
with contextlib.closing(s):
|
|
s.bind(sa)
|
|
for ip in (addr.address, addr.netmask, addr.broadcast,
|
|
addr.ptp):
|
|
if ip is not None:
|
|
# TODO: skip AF_INET6 for now because I get:
|
|
# AddressValueError: Only hex digits permitted in
|
|
# u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0'
|
|
if addr.family != socket.AF_INET6:
|
|
check_net_address(ip, addr.family)
|
|
# broadcast and ptp addresses are mutually exclusive
|
|
if addr.broadcast:
|
|
self.assertIsNone(addr.ptp)
|
|
elif addr.ptp:
|
|
self.assertIsNone(addr.broadcast)
|
|
|
|
if BSD or MACOS or SUNOS:
|
|
if hasattr(socket, "AF_LINK"):
|
|
self.assertEqual(psutil.AF_LINK, socket.AF_LINK)
|
|
elif LINUX:
|
|
self.assertEqual(psutil.AF_LINK, socket.AF_PACKET)
|
|
elif WINDOWS:
|
|
self.assertEqual(psutil.AF_LINK, -1)
|
|
|
|
def test_net_if_addrs_mac_null_bytes(self):
|
|
# Simulate that the underlying C function returns an incomplete
|
|
# MAC address. psutil is supposed to fill it with null bytes.
|
|
# https://github.com/giampaolo/psutil/issues/786
|
|
if POSIX:
|
|
ret = [('em1', psutil.AF_LINK, '06:3d:29', None, None, None)]
|
|
else:
|
|
ret = [('em1', -1, '06-3d-29', None, None, None)]
|
|
with mock.patch('psutil._psplatform.net_if_addrs',
|
|
return_value=ret) as m:
|
|
addr = psutil.net_if_addrs()['em1'][0]
|
|
assert m.called
|
|
if POSIX:
|
|
self.assertEqual(addr.address, '06:3d:29:00:00:00')
|
|
else:
|
|
self.assertEqual(addr.address, '06-3d-29-00-00-00')
|
|
|
|
@unittest.skipIf(TRAVIS, "unreliable on TRAVIS") # raises EPERM
|
|
def test_net_if_stats(self):
|
|
nics = psutil.net_if_stats()
|
|
assert nics, nics
|
|
all_duplexes = (psutil.NIC_DUPLEX_FULL,
|
|
psutil.NIC_DUPLEX_HALF,
|
|
psutil.NIC_DUPLEX_UNKNOWN)
|
|
for name, stats in nics.items():
|
|
self.assertIsInstance(name, str)
|
|
isup, duplex, speed, mtu = stats
|
|
self.assertIsInstance(isup, bool)
|
|
self.assertIn(duplex, all_duplexes)
|
|
self.assertIn(duplex, all_duplexes)
|
|
self.assertGreaterEqual(speed, 0)
|
|
self.assertGreaterEqual(mtu, 0)
|
|
|
|
@unittest.skipIf(not (LINUX or BSD or MACOS),
|
|
"LINUX or BSD or MACOS specific")
|
|
def test_net_if_stats_enodev(self):
|
|
# See: https://github.com/giampaolo/psutil/issues/1279
|
|
with mock.patch('psutil._psutil_posix.net_if_mtu',
|
|
side_effect=OSError(errno.ENODEV, "")) as m:
|
|
ret = psutil.net_if_stats()
|
|
self.assertEqual(ret, {})
|
|
assert m.called
|
|
|
|
|
|
class TestSensorsAPIs(unittest.TestCase):
|
|
|
|
@unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")
|
|
def test_sensors_temperatures(self):
|
|
temps = psutil.sensors_temperatures()
|
|
for name, entries in temps.items():
|
|
self.assertIsInstance(name, str)
|
|
for entry in entries:
|
|
self.assertIsInstance(entry.label, str)
|
|
if entry.current is not None:
|
|
self.assertGreaterEqual(entry.current, 0)
|
|
if entry.high is not None:
|
|
self.assertGreaterEqual(entry.high, 0)
|
|
if entry.critical is not None:
|
|
self.assertGreaterEqual(entry.critical, 0)
|
|
|
|
@unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")
|
|
def test_sensors_temperatures_fahreneit(self):
|
|
d = {'coretemp': [('label', 50.0, 60.0, 70.0)]}
|
|
with mock.patch("psutil._psplatform.sensors_temperatures",
|
|
return_value=d) as m:
|
|
temps = psutil.sensors_temperatures(
|
|
fahrenheit=True)['coretemp'][0]
|
|
assert m.called
|
|
self.assertEqual(temps.current, 122.0)
|
|
self.assertEqual(temps.high, 140.0)
|
|
self.assertEqual(temps.critical, 158.0)
|
|
|
|
@unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")
|
|
@unittest.skipIf(not HAS_BATTERY, "no battery")
|
|
def test_sensors_battery(self):
|
|
ret = psutil.sensors_battery()
|
|
self.assertGreaterEqual(ret.percent, 0)
|
|
self.assertLessEqual(ret.percent, 100)
|
|
if ret.secsleft not in (psutil.POWER_TIME_UNKNOWN,
|
|
psutil.POWER_TIME_UNLIMITED):
|
|
self.assertGreaterEqual(ret.secsleft, 0)
|
|
else:
|
|
if ret.secsleft == psutil.POWER_TIME_UNLIMITED:
|
|
self.assertTrue(ret.power_plugged)
|
|
self.assertIsInstance(ret.power_plugged, bool)
|
|
|
|
@unittest.skipIf(not HAS_SENSORS_FANS, "not supported")
|
|
def test_sensors_fans(self):
|
|
fans = psutil.sensors_fans()
|
|
for name, entries in fans.items():
|
|
self.assertIsInstance(name, str)
|
|
for entry in entries:
|
|
self.assertIsInstance(entry.label, str)
|
|
self.assertIsInstance(entry.current, (int, long))
|
|
self.assertGreaterEqual(entry.current, 0)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
from psutil.tests.runner import run
|
|
run(__file__)
|