1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3"""
4Interactive python script for testing cgroups. It will try to use system
5resources such as cpu, memory and device IO. The other cgroups test
6instrumentation will inspect whether the linux box behaved as it should.
7
8@copyright: 2011 Red Hat Inc.
9@author: Lukas Doktor <ldoktor@redhat.com>
10"""
11import array, sys, time, math, os
12from tempfile import mktemp
13
14def test_smoke(args):
15    """
16    SIGSTOP the process and after SIGCONT exits.
17    """
18    print "TEST: smoke"
19    print "TEST: wait for input"
20    raw_input()
21    print "PASS: smoke"
22
23
24def test_memfill(args):
25    """
26    SIGSTOP and after SIGCONT fills the memory up to size size.
27    """
28    size = 1024
29    f = sys.stdout
30    if args:
31        size = int(args[0])
32        if len(args) > 1:
33            f = open(args[1], 'w', 0)
34    print "TEST: memfill (%dM)" % size
35    print "Redirecting to: %s" % f.name
36    f.write("TEST: memfill (%dM)\n" % size)
37    f.write("TEST: wait for input\n")
38    raw_input()
39    mem = array.array('B')
40    buf = ""
41    for i in range(1024 * 1024):
42        buf += '\x00'
43    for i in range(size):
44        mem.fromstring(buf)
45        f.write("TEST: %dM\n" % i)
46        try:
47            f.flush()
48            os.fsync(f)
49        except:
50            pass
51    f.write("PASS: memfill (%dM)\n" % size)
52
53
54def test_cpu(args):
55    """
56    Stress the CPU.
57    """
58    print "TEST: cpu"
59    print "TEST: wait for input"
60    raw_input()
61    while True:
62        for i in range (1000, 10000):
63            math.factorial(i)
64
65
66def test_devices(args):
67    if args:
68        if args[0] == "write":
69            test_devices_write()
70        else:
71            test_devices_read()
72    else:
73        test_devices_read()
74
75
76def test_devices_read():
77    """
78    Inf read from /dev/zero
79    """
80    print "TEST: devices read"
81    print "TEST: wait for input"
82    raw_input()
83
84    dev = open("/dev/zero", 'r')
85    while True:
86        print "TEST: tick"
87        dev.flush()
88        dev.read(1024*1024)
89        time.sleep(1)
90
91
92def test_devices_write():
93    """
94    Inf write into /dev/null device
95    """
96    print "TEST: devices write"
97    print "TEST: wait for input"
98    raw_input()
99
100    dev = open("/dev/null", 'w')
101    buf = ""
102    for _ in range(1024*1024):
103        buf += '\x00'
104    while True:
105        print "TEST: tick"
106        dev.write(buf)
107        dev.flush()
108        time.sleep(1)
109
110
111def main():
112    """
113    Main (infinite) loop.
114    """
115    if len(sys.argv) < 2:
116        print "FAIL: Incorrect usage (%s)" % sys.argv
117        return -1
118    args = sys.argv[2:]
119    if sys.argv[1] == "smoke":
120        test_smoke(args)
121    elif sys.argv[1] == "memfill":
122        test_memfill(args)
123    elif sys.argv[1] == "cpu":
124        test_cpu(args)
125    elif sys.argv[1] == "devices":
126        test_devices(args)
127    else:
128        print "FAIL: No test specified (%s)" % sys.argv
129
130if __name__ == "__main__":
131    main()
132