summaryrefslogtreecommitdiffstats
path: root/scripts/pybootchartgui/pybootchartgui/tests/process_tree_test.py
blob: 0ff645a170b6176e274587f6215625bb972c3c3f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import sys
import os
import unittest

sys.path.insert(0, os.getcwd())

import pybootchartgui.parsing as parsing
import pybootchartgui.process_tree as process_tree
import pybootchartgui.main as main

if sys.version_info >= (3, 0):
    long = int

class TestProcessTree(unittest.TestCase):

    def setUp(self):
        self.name = "Process tree unittest"
        self.rootdir = os.path.join(os.path.dirname(sys.argv[0]), '../../examples/1/')

        parser = main._mk_options_parser()
        options, args = parser.parse_args(['--q', self.rootdir])
        writer = main._mk_writer(options)
        self.writer = writer
        trace = parsing.Trace(writer, args, options)
        self.trace = trace

        parsing.parse_file(writer, trace, self.mk_fname('proc_ps.log'))
        trace.compile(writer)
        self.processtree = process_tree.ProcessTree(writer, None, trace.ps_stats, \
            trace.ps_stats.sample_period, None, options.prune, None, None, False, for_testing = True)

    def mk_fname(self,f):
        return os.path.join(self.rootdir, f)

    def flatten(self, process_tree):
        flattened = []
        for p in process_tree:
            flattened.append(p)
            flattened.extend(self.flatten(p.child_list))
        return flattened

    def checkAgainstJavaExtract(self, filename, process_tree):
        test_data = open(filename)
        for expected, actual in zip(test_data, self.flatten(process_tree)):
            tokens = expected.split('\t')
            self.assertEqual(int(tokens[0]), actual.pid // 1000)
            self.assertEqual(tokens[1], actual.cmd)
            self.assertEqual(long(tokens[2]), 10 * actual.start_time)
            self.assert_(long(tokens[3]) - 10 * actual.duration < 5, "duration")
            self.assertEqual(int(tokens[4]), len(actual.child_list))
            self.assertEqual(int(tokens[5]), len(actual.samples))
        test_data.close()

    def testBuild(self):
        process_tree = self.processtree.process_tree
        self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.1.log'), process_tree)

    def testMergeLogger(self):
        self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
        process_tree = self.processtree.process_tree
        self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.2.log'), process_tree)

    def test_merge_logger(self):
        proc_ps_path = os.path.join(os.path.dirname(sys.argv[0]), 'test_data/process_tree/proc_ps.log')
        trace = self.trace
        writer = self.writer
        parsing.parse_file(writer, trace, proc_ps_path)
        trace.compile(writer)
        processtree = process_tree.ProcessTree(writer, None, trace.ps_stats, \
            trace.ps_stats.sample_period, None, False, None, None, False, for_testing=True)
        removed = processtree.merge_logger(processtree.process_tree, 'bootchartd', None, False)
        self.assertNotEqual(removed, 0)

    def testPrune(self):
        self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
        self.processtree.prune(self.processtree.process_tree, None)
        process_tree = self.processtree.process_tree
        self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3b.log'), process_tree)

    def testMergeExploders(self):
        self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
        self.processtree.prune(self.processtree.process_tree, None)
        self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup']))
        process_tree = self.processtree.process_tree
        self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3c.log'), process_tree)

    def testMergeSiblings(self):
        self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
        self.processtree.prune(self.processtree.process_tree, None)
        self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup']))
        self.processtree.merge_siblings(self.processtree.process_tree)
        process_tree = self.processtree.process_tree
        self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3d.log'), process_tree)

    def testMergeRuns(self):
        self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
        self.processtree.prune(self.processtree.process_tree, None)
        self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup']))
        self.processtree.merge_siblings(self.processtree.process_tree)
        self.processtree.merge_runs(self.processtree.process_tree)
        process_tree = self.processtree.process_tree
        self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3e.log'), process_tree)

if __name__ == '__main__':
    unittest.main()