aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--meta-python/recipes-devtools/python/python3-yappi/0001-Fix-imports-for-ptests.patch3855
1 files changed, 3832 insertions, 23 deletions
diff --git a/meta-python/recipes-devtools/python/python3-yappi/0001-Fix-imports-for-ptests.patch b/meta-python/recipes-devtools/python/python3-yappi/0001-Fix-imports-for-ptests.patch
index 714b2a2d34..fa58897e68 100644
--- a/meta-python/recipes-devtools/python/python3-yappi/0001-Fix-imports-for-ptests.patch
+++ b/meta-python/recipes-devtools/python/python3-yappi/0001-Fix-imports-for-ptests.patch
@@ -13,8 +13,6 @@ Signed-off-by: Trevor Gamblin <trevor.gamblin@windriver.com>
tests/test_tags.py | 2 +-
6 files changed, 6 insertions(+), 6 deletions(-)
-diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py
-index 4ac4c5f..4de94cf 100644
--- a/tests/test_asyncio.py
+++ b/tests/test_asyncio.py
@@ -2,7 +2,7 @@ import unittest
@@ -26,8 +24,6 @@ index 4ac4c5f..4de94cf 100644
@asyncio.coroutine
-diff --git a/tests/test_asyncio_context_vars.py b/tests/test_asyncio_context_vars.py
-index 5bd750c..9a253c0 100644
--- a/tests/test_asyncio_context_vars.py
+++ b/tests/test_asyncio_context_vars.py
@@ -5,7 +5,7 @@ import contextvars
@@ -39,21 +35,3841 @@ index 5bd750c..9a253c0 100644
import yappi
async_context_id = contextvars.ContextVar('async_context_id')
-diff --git a/tests/test_functionality.py b/tests/test_functionality.py
-index a73cb63..2ab273f 100644
--- a/tests/test_functionality.py
+++ b/tests/test_functionality.py
-@@ -5,7 +5,7 @@ import threading
- import unittest
- import yappi
- import _yappi
--import utils
+@@ -1,1916 +1,1916 @@
+-import os
+-import sys
+-import time
+-import threading
+-import unittest
+-import yappi
+-import _yappi
+-import utils
+-import multiprocessing # added to fix http://bugs.python.org/issue15881 for > Py2.6
+-import subprocess
+-
+-_counter = 0
+-
+-
+-class BasicUsage(utils.YappiUnitTestCase):
+-
+- def test_callback_function_int_return_overflow(self):
+- # this test is just here to check if any errors are generated, as the err
+- # is printed in C side, I did not include it here. THere are ways to test
+- # this deterministically, I did not bother
+- import ctypes
+-
+- def _unsigned_overflow_margin():
+- return 2**(ctypes.sizeof(ctypes.c_void_p) * 8) - 1
+-
+- def foo():
+- pass
+-
+- #with utils.captured_output() as (out, err):
+- yappi.set_context_id_callback(_unsigned_overflow_margin)
+- yappi.set_tag_callback(_unsigned_overflow_margin)
+- yappi.start()
+- foo()
+-
+- def test_issue60(self):
+-
+- def foo():
+- buf = bytearray()
+- buf += b't' * 200
+- view = memoryview(buf)[10:]
+- view = view.tobytes()
+- del buf[:10] # this throws exception
+- return view
+-
+- yappi.start(builtins=True)
+- foo()
+- self.assertTrue(
+- len(
+- yappi.get_func_stats(
+- filter_callback=lambda x: yappi.
+- func_matches(x, [memoryview.tobytes])
+- )
+- ) > 0
+- )
+- yappi.stop()
+-
+- def test_issue54(self):
+-
+- def _tag_cbk():
+- global _counter
+- _counter += 1
+- return _counter
+-
+- def a():
+- pass
+-
+- def b():
+- pass
+-
+- yappi.set_tag_callback(_tag_cbk)
+- yappi.start()
+- a()
+- a()
+- a()
+- yappi.stop()
+- stats = yappi.get_func_stats()
+- self.assertEqual(stats.pop().ncall, 3) # aggregated if no tag is given
+- stats = yappi.get_func_stats(tag=1)
+-
+- for i in range(1, 3):
+- stats = yappi.get_func_stats(tag=i)
+- stats = yappi.get_func_stats(
+- tag=i, filter_callback=lambda x: yappi.func_matches(x, [a])
+- )
+-
+- stat = stats.pop()
+- self.assertEqual(stat.ncall, 1)
+-
+- yappi.set_tag_callback(None)
+- yappi.clear_stats()
+- yappi.start()
+- b()
+- b()
+- stats = yappi.get_func_stats()
+- self.assertEqual(len(stats), 1)
+- stat = stats.pop()
+- self.assertEqual(stat.ncall, 2)
+-
+- def test_filter(self):
+-
+- def a():
+- pass
+-
+- def b():
+- a()
+-
+- def c():
+- b()
+-
+- _TCOUNT = 5
+-
+- ts = []
+- yappi.start()
+- for i in range(_TCOUNT):
+- t = threading.Thread(target=c)
+- t.start()
+- ts.append(t)
+-
+- for t in ts:
+- t.join()
+-
+- yappi.stop()
+-
+- ctx_ids = []
+- for tstat in yappi.get_thread_stats():
+- if tstat.name == '_MainThread':
+- main_ctx_id = tstat.id
+- else:
+- ctx_ids.append(tstat.id)
+-
+- fstats = yappi.get_func_stats(filter={"ctx_id": 9})
+- self.assertTrue(fstats.empty())
+- fstats = yappi.get_func_stats(
+- filter={
+- "ctx_id": main_ctx_id,
+- "name": "c"
+- }
+- ) # main thread
+- self.assertTrue(fstats.empty())
+-
+- for i in ctx_ids:
+- fstats = yappi.get_func_stats(
+- filter={
+- "ctx_id": i,
+- "name": "a",
+- "ncall": 1
+- }
+- )
+- self.assertEqual(fstats.pop().ncall, 1)
+- fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "b"})
+- self.assertEqual(fstats.pop().ncall, 1)
+- fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "c"})
+- self.assertEqual(fstats.pop().ncall, 1)
+-
+- yappi.clear_stats()
+- yappi.start(builtins=True)
+- time.sleep(0.1)
+- yappi.stop()
+- fstats = yappi.get_func_stats(filter={"module": "time"})
+- self.assertEqual(len(fstats), 1)
+-
+- # invalid filters`
+- self.assertRaises(
+- Exception, yappi.get_func_stats, filter={'tag': "sss"}
+- )
+- self.assertRaises(
+- Exception, yappi.get_func_stats, filter={'ctx_id': "None"}
+- )
+-
+- def test_filter_callback(self):
+-
+- def a():
+- time.sleep(0.1)
+-
+- def b():
+- a()
+-
+- def c():
+- pass
+-
+- def d():
+- pass
+-
+- yappi.set_clock_type("wall")
+- yappi.start(builtins=True)
+- a()
+- b()
+- c()
+- d()
+- stats = yappi.get_func_stats(
+- filter_callback=lambda x: yappi.func_matches(x, [a, b])
+- )
+- #stats.print_all()
+- r1 = '''
+- tests/test_functionality.py:98 a 2 0.000000 0.200350 0.100175
+- tests/test_functionality.py:101 b 1 0.000000 0.120000 0.100197
+- '''
+- self.assert_traces_almost_equal(r1, stats)
+- self.assertEqual(len(stats), 2)
+- stats = yappi.get_func_stats(
+- filter_callback=lambda x: yappi.
+- module_matches(x, [sys.modules[__name__]])
+- )
+- r1 = '''
+- tests/test_functionality.py:98 a 2 0.000000 0.230130 0.115065
+- tests/test_functionality.py:101 b 1 0.000000 0.120000 0.109011
+- tests/test_functionality.py:104 c 1 0.000000 0.000002 0.000002
+- tests/test_functionality.py:107 d 1 0.000000 0.000001 0.000001
+- '''
+- self.assert_traces_almost_equal(r1, stats)
+- self.assertEqual(len(stats), 4)
+-
+- stats = yappi.get_func_stats(
+- filter_callback=lambda x: yappi.func_matches(x, [time.sleep])
+- )
+- self.assertEqual(len(stats), 1)
+- r1 = '''
+- time.sleep 2 0.206804 0.220000 0.103402
+- '''
+- self.assert_traces_almost_equal(r1, stats)
+-
+- def test_print_formatting(self):
+-
+- def a():
+- pass
+-
+- def b():
+- a()
+-
+- func_cols = {
+- 1: ("name", 48),
+- 0: ("ncall", 5),
+- 2: ("tsub", 8),
+- }
+- thread_cols = {
+- 1: ("name", 48),
+- 0: ("ttot", 8),
+- }
+-
+- yappi.start()
+- a()
+- b()
+- yappi.stop()
+- fs = yappi.get_func_stats()
+- cs = fs[1].children
+- ts = yappi.get_thread_stats()
+- #fs.print_all(out=sys.stderr, columns={1:("name", 70), })
+- #cs.print_all(out=sys.stderr, columns=func_cols)
+- #ts.print_all(out=sys.stderr, columns=thread_cols)
+- #cs.print_all(out=sys.stderr, columns={})
+-
+- self.assertRaises(
+- yappi.YappiError, fs.print_all, columns={1: ("namee", 9)}
+- )
+- self.assertRaises(
+- yappi.YappiError, cs.print_all, columns={1: ("dd", 0)}
+- )
+- self.assertRaises(
+- yappi.YappiError, ts.print_all, columns={1: ("tidd", 0)}
+- )
+-
+- def test_get_clock(self):
+- yappi.set_clock_type('cpu')
+- self.assertEqual('cpu', yappi.get_clock_type())
+- clock_info = yappi.get_clock_info()
+- self.assertTrue('api' in clock_info)
+- self.assertTrue('resolution' in clock_info)
+-
+- yappi.set_clock_type('wall')
+- self.assertEqual('wall', yappi.get_clock_type())
+-
+- t0 = yappi.get_clock_time()
+- time.sleep(0.1)
+- duration = yappi.get_clock_time() - t0
+- self.assertTrue(0.05 < duration < 0.3)
+-
+- def test_profile_decorator(self):
+-
+- def aggregate(func, stats):
+- fname = "tests/%s.profile" % (func.__name__)
+- try:
+- stats.add(fname)
+- except IOError:
+- pass
+- stats.save(fname)
+- raise Exception("messing around")
+-
+- @yappi.profile(return_callback=aggregate)
+- def a(x, y):
+- if x + y == 25:
+- raise Exception("")
+- return x + y
+-
+- def b():
+- pass
+-
+- try:
+- os.remove(
+- "tests/a.profile"
+- ) # remove the one from prev test, if available
+- except:
+- pass
+-
+- # global profile is on to mess things up
+- yappi.start()
+- b()
+-
+- # assert functionality and call function at same time
+- try:
+- self.assertEqual(a(1, 2), 3)
+- except:
+- pass
+- try:
+- self.assertEqual(a(2, 5), 7)
+- except:
+- pass
+- try:
+- a(4, 21)
+- except:
+- pass
+- stats = yappi.get_func_stats().add("tests/a.profile")
+- fsa = utils.find_stat_by_name(stats, 'a')
+- self.assertEqual(fsa.ncall, 3)
+- self.assertEqual(len(stats), 1) # b() should be cleared out.
+-
+- @yappi.profile(return_callback=aggregate)
+- def count_down_rec(n):
+- if n == 0:
+- return
+- count_down_rec(n - 1)
+-
+- try:
+- os.remove(
+- "tests/count_down_rec.profile"
+- ) # remove the one from prev test, if available
+- except:
+- pass
+-
+- try:
+- count_down_rec(4)
+- except:
+- pass
+- try:
+- count_down_rec(3)
+- except:
+- pass
+-
+- stats = yappi.YFuncStats("tests/count_down_rec.profile")
+- fsrec = utils.find_stat_by_name(stats, 'count_down_rec')
+- self.assertEqual(fsrec.ncall, 9)
+- self.assertEqual(fsrec.nactualcall, 2)
+-
+- def test_strip_dirs(self):
+-
+- def a():
+- pass
+-
+- stats = utils.run_and_get_func_stats(a, )
+- stats.strip_dirs()
+- fsa = utils.find_stat_by_name(stats, "a")
+- self.assertEqual(fsa.module, os.path.basename(fsa.module))
+-
+- @unittest.skipIf(os.name == "nt", "do not run on Windows")
+- def test_run_as_script(self):
+- import re
+- p = subprocess.Popen(
+- ['yappi', os.path.join('./tests', 'run_as_script.py')],
+- stdout=subprocess.PIPE
+- )
+- out, err = p.communicate()
+- self.assertEqual(p.returncode, 0)
+- func_stats, thread_stats = re.split(
+- b'name\\s+id\\s+tid\\s+ttot\\s+scnt\\s*\n', out
+- )
+- self.assertTrue(b'FancyThread' in thread_stats)
+-
+- def test_yappi_overhead(self):
+- LOOP_COUNT = 100000
+-
+- def a():
+- pass
+-
+- def b():
+- for i in range(LOOP_COUNT):
+- a()
+-
+- t0 = time.time()
+- yappi.start()
+- b()
+- yappi.stop()
+- time_with_yappi = time.time() - t0
+- t0 = time.time()
+- b()
+- time_without_yappi = time.time() - t0
+- if time_without_yappi == 0:
+- time_without_yappi = 0.000001
+-
+- # in latest v0.82, I calculated this as close to "7.0" in my machine.
+- # however, %83 of this overhead is coming from tickcount(). The other %17
+- # seems to have been evenly distributed to the internal bookkeeping
+- # structures/algorithms which seems acceptable. Note that our test only
+- # tests one function being profiled at-a-time in a short interval.
+- # profiling high number of functions in a small time
+- # is a different beast, (which is pretty unlikely in most applications)
+- # So as a conclusion: I cannot see any optimization window for Yappi that
+- # is worth implementing as we will only optimize %17 of the time.
+- sys.stderr.write("\r\nYappi puts %0.1f times overhead to the profiled application in average.\r\n" % \
+- (time_with_yappi / time_without_yappi))
+-
+- def test_clear_stats_while_running(self):
+-
+- def a():
+- pass
+-
+- yappi.start()
+- a()
+- yappi.clear_stats()
+- a()
+- stats = yappi.get_func_stats()
+- fsa = utils.find_stat_by_name(stats, 'a')
+- self.assertEqual(fsa.ncall, 1)
+-
+- def test_generator(self):
+-
+- def _gen(n):
+- while (n > 0):
+- yield n
+- n -= 1
+-
+- yappi.start()
+- for x in _gen(5):
+- pass
+- self.assertTrue(
+- yappi.convert2pstats(yappi.get_func_stats()) is not None
+- )
+-
+- def test_slice_child_stats_and_strip_dirs(self):
+-
+- def b():
+- for i in range(10000000):
+- pass
+-
+- def a():
+- b()
+-
+- yappi.start(builtins=True)
+- a()
+- stats = yappi.get_func_stats()
+- fsa = utils.find_stat_by_name(stats, 'a')
+- fsb = utils.find_stat_by_name(stats, 'b')
+- self.assertTrue(fsa.children[0:1] is not None)
+- prev_afullname = fsa.full_name
+- prev_bchildfullname = fsa.children[fsb].full_name
+- stats.strip_dirs()
+- self.assertTrue(len(prev_afullname) > len(fsa.full_name))
+- self.assertTrue(
+- len(prev_bchildfullname) > len(fsa.children[fsb].full_name)
+- )
+-
+- def test_children_stat_functions(self):
+- _timings = {"a_1": 5, "b_1": 3, "c_1": 1}
+- _yappi._set_test_timings(_timings)
+-
+- def b():
+- pass
+-
+- def c():
+- pass
+-
+- def a():
+- b()
+- c()
+-
+- yappi.start()
+- a()
+- b() # non-child call
+- c() # non-child call
+- stats = yappi.get_func_stats()
+- fsa = utils.find_stat_by_name(stats, 'a')
+- childs_of_a = fsa.children.get().sort("tavg", "desc")
+- prev_item = None
+- for item in childs_of_a:
+- if prev_item:
+- self.assertTrue(prev_item.tavg > item.tavg)
+- prev_item = item
+- childs_of_a.sort("name", "desc")
+- prev_item = None
+- for item in childs_of_a:
+- if prev_item:
+- self.assertTrue(prev_item.name > item.name)
+- prev_item = item
+- childs_of_a.clear()
+- self.assertTrue(childs_of_a.empty())
+-
+- def test_no_stats_different_clock_type_load(self):
+-
+- def a():
+- pass
+-
+- yappi.start()
+- a()
+- yappi.stop()
+- yappi.get_func_stats().save("tests/ystats1.ys")
+- yappi.clear_stats()
+- yappi.set_clock_type("WALL")
+- yappi.start()
+- yappi.stop()
+- stats = yappi.get_func_stats().add("tests/ystats1.ys")
+- fsa = utils.find_stat_by_name(stats, 'a')
+- self.assertTrue(fsa is not None)
+-
+- def test_subsequent_profile(self):
+- _timings = {"a_1": 1, "b_1": 1}
+- _yappi._set_test_timings(_timings)
+-
+- def a():
+- pass
+-
+- def b():
+- pass
+-
+- yappi.start()
+- a()
+- yappi.stop()
+- yappi.start()
+- b()
+- yappi.stop()
+- stats = yappi.get_func_stats()
+- fsa = utils.find_stat_by_name(stats, 'a')
+- fsb = utils.find_stat_by_name(stats, 'b')
+- self.assertTrue(fsa is not None)
+- self.assertTrue(fsb is not None)
+- self.assertEqual(fsa.ttot, 1)
+- self.assertEqual(fsb.ttot, 1)
+-
+- def test_lambda(self):
+- f = lambda: time.sleep(0.3)
+- yappi.set_clock_type("wall")
+- yappi.start()
+- f()
+- stats = yappi.get_func_stats()
+- fsa = utils.find_stat_by_name(stats, '<lambda>')
+- self.assertTrue(fsa.ttot > 0.1)
+-
+- def test_module_stress(self):
+- self.assertEqual(yappi.is_running(), False)
+-
+- yappi.start()
+- yappi.clear_stats()
+- self.assertRaises(_yappi.error, yappi.set_clock_type, "wall")
+-
+- yappi.stop()
+- yappi.clear_stats()
+- yappi.set_clock_type("cpu")
+- self.assertRaises(yappi.YappiError, yappi.set_clock_type, "dummy")
+- self.assertEqual(yappi.is_running(), False)
+- yappi.clear_stats()
+- yappi.clear_stats()
+-
+- def test_stat_sorting(self):
+- _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1}
+- _yappi._set_test_timings(_timings)
+-
+- self._ncall = 1
+-
+- def a():
+- b()
+-
+- def b():
+- if self._ncall == 2:
+- return
+- self._ncall += 1
+- a()
+-
+- stats = utils.run_and_get_func_stats(a)
+- stats = stats.sort("totaltime", "desc")
+- prev_stat = None
+- for stat in stats:
+- if prev_stat:
+- self.assertTrue(prev_stat.ttot >= stat.ttot)
+- prev_stat = stat
+- stats = stats.sort("totaltime", "asc")
+- prev_stat = None
+- for stat in stats:
+- if prev_stat:
+- self.assertTrue(prev_stat.ttot <= stat.ttot)
+- prev_stat = stat
+- stats = stats.sort("avgtime", "asc")
+- prev_stat = None
+- for stat in stats:
+- if prev_stat:
+- self.assertTrue(prev_stat.tavg <= stat.tavg)
+- prev_stat = stat
+- stats = stats.sort("name", "asc")
+- prev_stat = None
+- for stat in stats:
+- if prev_stat:
+- self.assertTrue(prev_stat.name <= stat.name)
+- prev_stat = stat
+- stats = stats.sort("subtime", "asc")
+- prev_stat = None
+- for stat in stats:
+- if prev_stat:
+- self.assertTrue(prev_stat.tsub <= stat.tsub)
+- prev_stat = stat
+-
+- self.assertRaises(
+- yappi.YappiError, stats.sort, "invalid_func_sorttype_arg"
+- )
+- self.assertRaises(
+- yappi.YappiError, stats.sort, "totaltime",
+- "invalid_func_sortorder_arg"
+- )
+-
+- def test_start_flags(self):
+- self.assertEqual(_yappi._get_start_flags(), None)
+- yappi.start()
+-
+- def a():
+- pass
+-
+- a()
+- self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0)
+- self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1)
+- self.assertEqual(len(yappi.get_thread_stats()), 1)
+-
+- def test_builtin_profiling(self):
+-
+- def a():
+- time.sleep(0.4) # is a builtin function
+-
+- yappi.set_clock_type('wall')
+-
+- yappi.start(builtins=True)
+- a()
+- stats = yappi.get_func_stats()
+- fsa = utils.find_stat_by_name(stats, 'sleep')
+- self.assertTrue(fsa is not None)
+- self.assertTrue(fsa.ttot > 0.3)
+- yappi.stop()
+- yappi.clear_stats()
+-
+- def a():
+- pass
+-
+- yappi.start()
+- t = threading.Thread(target=a)
+- t.start()
+- t.join()
+- stats = yappi.get_func_stats()
+-
+- def test_singlethread_profiling(self):
+- yappi.set_clock_type('wall')
+-
+- def a():
+- time.sleep(0.2)
+-
+- class Worker1(threading.Thread):
+-
+- def a(self):
+- time.sleep(0.3)
+-
+- def run(self):
+- self.a()
+-
+- yappi.start(profile_threads=False)
+-
+- c = Worker1()
+- c.start()
+- c.join()
+- a()
+- stats = yappi.get_func_stats()
+- fsa1 = utils.find_stat_by_name(stats, 'Worker1.a')
+- fsa2 = utils.find_stat_by_name(stats, 'a')
+- self.assertTrue(fsa1 is None)
+- self.assertTrue(fsa2 is not None)
+- self.assertTrue(fsa2.ttot > 0.1)
+-
+- def test_run(self):
+-
+- def profiled():
+- pass
+-
+- yappi.clear_stats()
+- try:
+- with yappi.run():
+- profiled()
+- stats = yappi.get_func_stats()
+- finally:
+- yappi.clear_stats()
+-
+- self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled'))
+-
+- def test_run_recursive(self):
+-
+- def profiled():
+- pass
+-
+- def not_profiled():
+- pass
+-
+- yappi.clear_stats()
+- try:
+- with yappi.run():
+- with yappi.run():
+- profiled()
+- # Profiling stopped here
+- not_profiled()
+- stats = yappi.get_func_stats()
+- finally:
+- yappi.clear_stats()
+-
+- self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled'))
+- self.assertIsNone(utils.find_stat_by_name(stats, 'not_profiled'))
+-
+-
+-class StatSaveScenarios(utils.YappiUnitTestCase):
+-
+- def test_pstats_conversion(self):
+-
+- def pstat_id(fs):
+- return (fs.module, fs.lineno, fs.name)
+-
+- def a():
+- d()
+-
+- def b():
+- d()
+-
+- def c():
+- pass
+-
+- def d():
+- pass
+-
+- _timings = {"a_1": 12, "b_1": 7, "c_1": 5, "d_1": 2}
+- _yappi._set_test_timings(_timings)
+- stats = utils.run_and_get_func_stats(a, )
+- stats.strip_dirs()
+- stats.save("tests/a1.pstats", type="pstat")
+- fsa_pid = pstat_id(utils.find_stat_by_name(stats, "a"))
+- fsd_pid = pstat_id(utils.find_stat_by_name(stats, "d"))
+- yappi.clear_stats()
+- _yappi._set_test_timings(_timings)
+- stats = utils.run_and_get_func_stats(a, )
+- stats.strip_dirs()
+- stats.save("tests/a2.pstats", type="pstat")
+- yappi.clear_stats()
+- _yappi._set_test_timings(_timings)
+- stats = utils.run_and_get_func_stats(b, )
+- stats.strip_dirs()
+- stats.save("tests/b1.pstats", type="pstat")
+- fsb_pid = pstat_id(utils.find_stat_by_name(stats, "b"))
+- yappi.clear_stats()
+- _yappi._set_test_timings(_timings)
+- stats = utils.run_and_get_func_stats(c, )
+- stats.strip_dirs()
+- stats.save("tests/c1.pstats", type="pstat")
+- fsc_pid = pstat_id(utils.find_stat_by_name(stats, "c"))
+-
+- # merge saved stats and check pstats values are correct
+- import pstats
+- p = pstats.Stats(
+- 'tests/a1.pstats', 'tests/a2.pstats', 'tests/b1.pstats',
+- 'tests/c1.pstats'
+- )
+- p.strip_dirs()
+- # ct = ttot, tt = tsub
+- (cc, nc, tt, ct, callers) = p.stats[fsa_pid]
+- self.assertEqual(cc, nc, 2)
+- self.assertEqual(tt, 20)
+- self.assertEqual(ct, 24)
+- (cc, nc, tt, ct, callers) = p.stats[fsd_pid]
+- self.assertEqual(cc, nc, 3)
+- self.assertEqual(tt, 6)
+- self.assertEqual(ct, 6)
+- self.assertEqual(len(callers), 2)
+- (cc, nc, tt, ct) = callers[fsa_pid]
+- self.assertEqual(cc, nc, 2)
+- self.assertEqual(tt, 4)
+- self.assertEqual(ct, 4)
+- (cc, nc, tt, ct) = callers[fsb_pid]
+- self.assertEqual(cc, nc, 1)
+- self.assertEqual(tt, 2)
+- self.assertEqual(ct, 2)
+-
+- def test_merge_stats(self):
+- _timings = {
+- "a_1": 15,
+- "b_1": 14,
+- "c_1": 12,
+- "d_1": 10,
+- "e_1": 9,
+- "f_1": 7,
+- "g_1": 6,
+- "h_1": 5,
+- "i_1": 1
+- }
+- _yappi._set_test_timings(_timings)
+-
+- def a():
+- b()
+-
+- def b():
+- c()
+-
+- def c():
+- d()
+-
+- def d():
+- e()
+-
+- def e():
+- f()
+-
+- def f():
+- g()
+-
+- def g():
+- h()
+-
+- def h():
+- i()
+-
+- def i():
+- pass
+-
+- yappi.start()
+- a()
+- a()
+- yappi.stop()
+- stats = yappi.get_func_stats()
+- self.assertRaises(
+- NotImplementedError, stats.save, "", "INVALID_SAVE_TYPE"
+- )
+- stats.save("tests/ystats2.ys")
+- yappi.clear_stats()
+- _yappi._set_test_timings(_timings)
+- yappi.start()
+- a()
+- stats = yappi.get_func_stats().add("tests/ystats2.ys")
+- fsa = utils.find_stat_by_name(stats, "a")
+- fsb = utils.find_stat_by_name(stats, "b")
+- fsc = utils.find_stat_by_name(stats, "c")
+- fsd = utils.find_stat_by_name(stats, "d")
+- fse = utils.find_stat_by_name(stats, "e")
+- fsf = utils.find_stat_by_name(stats, "f")
+- fsg = utils.find_stat_by_name(stats, "g")
+- fsh = utils.find_stat_by_name(stats, "h")
+- fsi = utils.find_stat_by_name(stats, "i")
+- self.assertEqual(fsa.ttot, 45)
+- self.assertEqual(fsa.ncall, 3)
+- self.assertEqual(fsa.nactualcall, 3)
+- self.assertEqual(fsa.tsub, 3)
+- self.assertEqual(fsa.children[fsb].ttot, fsb.ttot)
+- self.assertEqual(fsa.children[fsb].tsub, fsb.tsub)
+- self.assertEqual(fsb.children[fsc].ttot, fsc.ttot)
+- self.assertEqual(fsb.children[fsc].tsub, fsc.tsub)
+- self.assertEqual(fsc.tsub, 6)
+- self.assertEqual(fsc.children[fsd].ttot, fsd.ttot)
+- self.assertEqual(fsc.children[fsd].tsub, fsd.tsub)
+- self.assertEqual(fsd.children[fse].ttot, fse.ttot)
+- self.assertEqual(fsd.children[fse].tsub, fse.tsub)
+- self.assertEqual(fse.children[fsf].ttot, fsf.ttot)
+- self.assertEqual(fse.children[fsf].tsub, fsf.tsub)
+- self.assertEqual(fsf.children[fsg].ttot, fsg.ttot)
+- self.assertEqual(fsf.children[fsg].tsub, fsg.tsub)
+- self.assertEqual(fsg.ttot, 18)
+- self.assertEqual(fsg.tsub, 3)
+- self.assertEqual(fsg.children[fsh].ttot, fsh.ttot)
+- self.assertEqual(fsg.children[fsh].tsub, fsh.tsub)
+- self.assertEqual(fsh.ttot, 15)
+- self.assertEqual(fsh.tsub, 12)
+- self.assertEqual(fsh.tavg, 5)
+- self.assertEqual(fsh.children[fsi].ttot, fsi.ttot)
+- self.assertEqual(fsh.children[fsi].tsub, fsi.tsub)
+- #stats.debug_print()
+-
+- def test_merge_multithreaded_stats(self):
+- import _yappi
+- timings = {"a_1": 2, "b_1": 1}
+- _yappi._set_test_timings(timings)
+-
+- def a():
+- pass
+-
+- def b():
+- pass
+-
+- yappi.start()
+- t = threading.Thread(target=a)
+- t.start()
+- t.join()
+- t = threading.Thread(target=b)
+- t.start()
+- t.join()
+- yappi.get_func_stats().save("tests/ystats1.ys")
+- yappi.clear_stats()
+- _yappi._set_test_timings(timings)
+- self.assertEqual(len(yappi.get_func_stats()), 0)
+- self.assertEqual(len(yappi.get_thread_stats()), 1)
+- t = threading.Thread(target=a)
+- t.start()
+- t.join()
+-
+- self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0)
+- self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1)
+- yappi.get_func_stats().save("tests/ystats2.ys")
+-
+- stats = yappi.YFuncStats([
+- "tests/ystats1.ys",
+- "tests/ystats2.ys",
+- ])
+- fsa = utils.find_stat_by_name(stats, "a")
+- fsb = utils.find_stat_by_name(stats, "b")
+- self.assertEqual(fsa.ncall, 2)
+- self.assertEqual(fsb.ncall, 1)
+- self.assertEqual(fsa.tsub, fsa.ttot, 4)
+- self.assertEqual(fsb.tsub, fsb.ttot, 1)
+-
+- def test_merge_load_different_clock_types(self):
+- yappi.start(builtins=True)
+-
+- def a():
+- b()
+-
+- def b():
+- c()
+-
+- def c():
+- pass
+-
+- t = threading.Thread(target=a)
+- t.start()
+- t.join()
+- yappi.get_func_stats().sort("name", "asc").save("tests/ystats1.ys")
+- yappi.stop()
+- yappi.clear_stats()
+- yappi.start(builtins=False)
+- t = threading.Thread(target=a)
+- t.start()
+- t.join()
+- yappi.get_func_stats().save("tests/ystats2.ys")
+- yappi.stop()
+- self.assertRaises(_yappi.error, yappi.set_clock_type, "wall")
+- yappi.clear_stats()
+- yappi.set_clock_type("wall")
+- yappi.start()
+- t = threading.Thread(target=a)
+- t.start()
+- t.join()
+- yappi.get_func_stats().save("tests/ystats3.ys")
+- self.assertRaises(
+- yappi.YappiError,
+- yappi.YFuncStats().add("tests/ystats1.ys").add, "tests/ystats3.ys"
+- )
+- stats = yappi.YFuncStats(["tests/ystats1.ys",
+- "tests/ystats2.ys"]).sort("name")
+- fsa = utils.find_stat_by_name(stats, "a")
+- fsb = utils.find_stat_by_name(stats, "b")
+- fsc = utils.find_stat_by_name(stats, "c")
+- self.assertEqual(fsa.ncall, 2)
+- self.assertEqual(fsa.ncall, fsb.ncall, fsc.ncall)
+-
+- def test_merge_aabab_aabbc(self):
+- _timings = {
+- "a_1": 15,
+- "a_2": 14,
+- "b_1": 12,
+- "a_3": 10,
+- "b_2": 9,
+- "c_1": 4
+- }
+- _yappi._set_test_timings(_timings)
+-
+- def a():
+- if self._ncall == 1:
+- self._ncall += 1
+- a()
+- elif self._ncall == 5:
+- self._ncall += 1
+- a()
+- else:
+- b()
+-
+- def b():
+- if self._ncall == 2:
+- self._ncall += 1
+- a()
+- elif self._ncall == 6:
+- self._ncall += 1
+- b()
+- elif self._ncall == 7:
+- c()
+- else:
+- return
+-
+- def c():
+- pass
+-
+- self._ncall = 1
+- stats = utils.run_and_get_func_stats(a, )
+- stats.save("tests/ystats1.ys")
+- yappi.clear_stats()
+- _yappi._set_test_timings(_timings)
+- #stats.print_all()
+-
+- self._ncall = 5
+- stats = utils.run_and_get_func_stats(a, )
+- stats.save("tests/ystats2.ys")
+-
+- #stats.print_all()
+-
+- def a(): # same name but another function(code object)
+- pass
+-
+- yappi.start()
+- a()
+- stats = yappi.get_func_stats().add(
+- ["tests/ystats1.ys", "tests/ystats2.ys"]
+- )
+- #stats.print_all()
+- self.assertEqual(len(stats), 4)
+-
+- fsa = None
+- for stat in stats:
+- if stat.name == "a" and stat.ttot == 45:
+- fsa = stat
+- break
+- self.assertTrue(fsa is not None)
+-
+- self.assertEqual(fsa.ncall, 7)
+- self.assertEqual(fsa.nactualcall, 3)
+- self.assertEqual(fsa.ttot, 45)
+- self.assertEqual(fsa.tsub, 10)
+- fsb = utils.find_stat_by_name(stats, "b")
+- fsc = utils.find_stat_by_name(stats, "c")
+- self.assertEqual(fsb.ncall, 6)
+- self.assertEqual(fsb.nactualcall, 3)
+- self.assertEqual(fsb.ttot, 36)
+- self.assertEqual(fsb.tsub, 27)
+- self.assertEqual(fsb.tavg, 6)
+- self.assertEqual(fsc.ttot, 8)
+- self.assertEqual(fsc.tsub, 8)
+- self.assertEqual(fsc.tavg, 4)
+- self.assertEqual(fsc.nactualcall, fsc.ncall, 2)
+-
+-
+-class MultithreadedScenarios(utils.YappiUnitTestCase):
+-
+- def test_issue_32(self):
+- '''
+- Start yappi from different thread and we get Internal Error(15) as
+- the current_ctx_id() called while enumerating the threads in start()
+- and as it does not swap to the enumerated ThreadState* the THreadState_GetDict()
+- returns wrong object and thus sets an invalid id for the _ctx structure.
+-
+- When this issue happens multiple Threads have same tid as the internal ts_ptr
+- will be same for different contexts. So, let's see if that happens
+- '''
+-
+- def foo():
+- time.sleep(0.2)
+-
+- def bar():
+- time.sleep(0.1)
+-
+- def thread_func():
+- yappi.set_clock_type("wall")
+- yappi.start()
+-
+- bar()
+-
+- t = threading.Thread(target=thread_func)
+- t.start()
+- t.join()
+-
+- foo()
+-
+- yappi.stop()
+-
+- thread_ids = set()
+- for tstat in yappi.get_thread_stats():
+- self.assertTrue(tstat.tid not in thread_ids)
+- thread_ids.add(tstat.tid)
+-
+- def test_subsequent_profile(self):
+- WORKER_COUNT = 5
+-
+- def a():
+- pass
+-
+- def b():
+- pass
+-
+- def c():
+- pass
+-
+- _timings = {
+- "a_1": 3,
+- "b_1": 2,
+- "c_1": 1,
+- }
+-
+- yappi.start()
+-
+- def g():
+- pass
+-
+- g()
+- yappi.stop()
+- yappi.clear_stats()
+- _yappi._set_test_timings(_timings)
+- yappi.start()
+-
+- _dummy = []
+- for i in range(WORKER_COUNT):
+- t = threading.Thread(target=a)
+- t.start()
+- t.join()
+- for i in range(WORKER_COUNT):
+- t = threading.Thread(target=b)
+- t.start()
+- _dummy.append(t)
+- t.join()
+- for i in range(WORKER_COUNT):
+- t = threading.Thread(target=a)
+- t.start()
+- t.join()
+- for i in range(WORKER_COUNT):
+- t = threading.Thread(target=c)
+- t.start()
+- t.join()
+- yappi.stop()
+- yappi.start()
+-
+- def f():
+- pass
+-
+- f()
+- stats = yappi.get_func_stats()
+- fsa = utils.find_stat_by_name(stats, 'a')
+- fsb = utils.find_stat_by_name(stats, 'b')
+- fsc = utils.find_stat_by_name(stats, 'c')
+- self.assertEqual(fsa.ncall, 10)
+- self.assertEqual(fsb.ncall, 5)
+- self.assertEqual(fsc.ncall, 5)
+- self.assertEqual(fsa.ttot, fsa.tsub, 30)
+- self.assertEqual(fsb.ttot, fsb.tsub, 10)
+- self.assertEqual(fsc.ttot, fsc.tsub, 5)
+-
+- # MACOSx optimizes by only creating one worker thread
+- self.assertTrue(len(yappi.get_thread_stats()) >= 2)
+-
+- def test_basic(self):
+- yappi.set_clock_type('wall')
+-
+- def dummy():
+- pass
+-
+- def a():
+- time.sleep(0.2)
+-
+- class Worker1(threading.Thread):
+-
+- def a(self):
+- time.sleep(0.3)
+-
+- def run(self):
+- self.a()
+-
+- yappi.start(builtins=False, profile_threads=True)
+-
+- c = Worker1()
+- c.start()
+- c.join()
+- a()
+- stats = yappi.get_func_stats()
+- fsa1 = utils.find_stat_by_name(stats, 'Worker1.a')
+- fsa2 = utils.find_stat_by_name(stats, 'a')
+- self.assertTrue(fsa1 is not None)
+- self.assertTrue(fsa2 is not None)
+- self.assertTrue(fsa1.ttot > 0.2)
+- self.assertTrue(fsa2.ttot > 0.1)
+- tstats = yappi.get_thread_stats()
+- self.assertEqual(len(tstats), 2)
+- tsa = utils.find_stat_by_name(tstats, 'Worker1')
+- tsm = utils.find_stat_by_name(tstats, '_MainThread')
+- dummy() # call dummy to force ctx name to be retrieved again.
+- self.assertTrue(tsa is not None)
+- # TODO: I put dummy() to fix below, remove the comments after a while.
+- self.assertTrue( # FIX: I see this fails sometimes?
+- tsm is not None,
+- 'Could not find "_MainThread". Found: %s' % (', '.join(utils.get_stat_names(tstats))))
+-
+- def test_ctx_stats(self):
+- from threading import Thread
+- DUMMY_WORKER_COUNT = 5
+- yappi.start()
+-
+- class DummyThread(Thread):
+- pass
+-
+- def dummy():
+- pass
+-
+- def dummy_worker():
+- pass
+-
+- for i in range(DUMMY_WORKER_COUNT):
+- t = DummyThread(target=dummy_worker)
+- t.start()
+- t.join()
+- yappi.stop()
+- stats = yappi.get_thread_stats()
+- tsa = utils.find_stat_by_name(stats, "DummyThread")
+- self.assertTrue(tsa is not None)
+- yappi.clear_stats()
+- time.sleep(1.0)
+- _timings = {
+- "a_1": 6,
+- "b_1": 5,
+- "c_1": 3,
+- "d_1": 1,
+- "a_2": 4,
+- "b_2": 3,
+- "c_2": 2,
+- "d_2": 1
+- }
+- _yappi._set_test_timings(_timings)
+-
+- class Thread1(Thread):
+- pass
+-
+- class Thread2(Thread):
+- pass
+-
+- def a():
+- b()
+-
+- def b():
+- c()
+-
+- def c():
+- d()
+-
+- def d():
+- time.sleep(0.6)
+-
+- yappi.set_clock_type("wall")
+- yappi.start()
+- t1 = Thread1(target=a)
+- t1.start()
+- t2 = Thread2(target=a)
+- t2.start()
+- t1.join()
+- t2.join()
+- stats = yappi.get_thread_stats()
+-
+- # the fist clear_stats clears the context table?
+- tsa = utils.find_stat_by_name(stats, "DummyThread")
+- self.assertTrue(tsa is None)
+-
+- tst1 = utils.find_stat_by_name(stats, "Thread1")
+- tst2 = utils.find_stat_by_name(stats, "Thread2")
+- tsmain = utils.find_stat_by_name(stats, "_MainThread")
+- dummy() # call dummy to force ctx name to be retrieved again.
+- self.assertTrue(len(stats) == 3)
+- self.assertTrue(tst1 is not None)
+- self.assertTrue(tst2 is not None)
+- # TODO: I put dummy() to fix below, remove the comments after a while.
+- self.assertTrue( # FIX: I see this fails sometimes
+- tsmain is not None,
+- 'Could not find "_MainThread". Found: %s' % (', '.join(utils.get_stat_names(stats))))
+- self.assertTrue(1.0 > tst2.ttot >= 0.5)
+- self.assertTrue(1.0 > tst1.ttot >= 0.5)
+-
+- # test sorting of the ctx stats
+- stats = stats.sort("totaltime", "desc")
+- prev_stat = None
+- for stat in stats:
+- if prev_stat:
+- self.assertTrue(prev_stat.ttot >= stat.ttot)
+- prev_stat = stat
+- stats = stats.sort("totaltime", "asc")
+- prev_stat = None
+- for stat in stats:
+- if prev_stat:
+- self.assertTrue(prev_stat.ttot <= stat.ttot)
+- prev_stat = stat
+- stats = stats.sort("schedcount", "desc")
+- prev_stat = None
+- for stat in stats:
+- if prev_stat:
+- self.assertTrue(prev_stat.sched_count >= stat.sched_count)
+- prev_stat = stat
+- stats = stats.sort("name", "desc")
+- prev_stat = None
+- for stat in stats:
+- if prev_stat:
+- self.assertTrue(prev_stat.name.lower() >= stat.name.lower())
+- prev_stat = stat
+- self.assertRaises(
+- yappi.YappiError, stats.sort, "invalid_thread_sorttype_arg"
+- )
+- self.assertRaises(
+- yappi.YappiError, stats.sort, "invalid_thread_sortorder_arg"
+- )
+-
+- def test_ctx_stats_cpu(self):
+-
+- def get_thread_name():
+- try:
+- return threading.current_thread().name
+- except AttributeError:
+- return "Anonymous"
+-
+- def burn_cpu(sec):
+- t0 = yappi.get_clock_time()
+- elapsed = 0
+- while (elapsed < sec):
+- for _ in range(1000):
+- pass
+- elapsed = yappi.get_clock_time() - t0
+-
+- def test():
+-
+- ts = []
+- for i in (0.01, 0.05, 0.1):
+- t = threading.Thread(target=burn_cpu, args=(i, ))
+- t.name = "burn_cpu-%s" % str(i)
+- t.start()
+- ts.append(t)
+- for t in ts:
+- t.join()
+-
+- yappi.set_clock_type("cpu")
+- yappi.set_context_name_callback(get_thread_name)
+-
+- yappi.start()
+-
+- test()
+-
+- yappi.stop()
+-
+- tstats = yappi.get_thread_stats()
+- r1 = '''
+- burn_cpu-0.1 3 123145356058624 0.100105 8
+- burn_cpu-0.05 2 123145361313792 0.050149 8
+- burn_cpu-0.01 1 123145356058624 0.010127 2
+- MainThread 0 4321620864 0.001632 6
+- '''
+- self.assert_ctx_stats_almost_equal(r1, tstats)
+-
+- def test_producer_consumer_with_queues(self):
+- # we currently just stress yappi, no functionality test is done here.
+- yappi.start()
+- if utils.is_py3x():
+- from queue import Queue
+- else:
+- from Queue import Queue
+- from threading import Thread
+- WORKER_THREAD_COUNT = 50
+- WORK_ITEM_COUNT = 2000
+-
+- def worker():
+- while True:
+- item = q.get()
+- # do the work with item
+- q.task_done()
+-
+- q = Queue()
+- for i in range(WORKER_THREAD_COUNT):
+- t = Thread(target=worker)
+- t.daemon = True
+- t.start()
+-
+- for item in range(WORK_ITEM_COUNT):
+- q.put(item)
+- q.join() # block until all tasks are done
+- #yappi.get_func_stats().sort("callcount").print_all()
+- yappi.stop()
+-
+- def test_temporary_lock_waiting(self):
+- yappi.start()
+- _lock = threading.Lock()
+-
+- def worker():
+- _lock.acquire()
+- try:
+- time.sleep(1.0)
+- finally:
+- _lock.release()
+-
+- t1 = threading.Thread(target=worker)
+- t2 = threading.Thread(target=worker)
+- t1.start()
+- t2.start()
+- t1.join()
+- t2.join()
+- #yappi.get_func_stats().sort("callcount").print_all()
+- yappi.stop()
+-
+- @unittest.skipIf(os.name != "posix", "requires Posix compliant OS")
+- def test_signals_with_blocking_calls(self):
+- import signal, os, time
+-
+- # just to verify if signal is handled correctly and stats/yappi are not corrupted.
+- def handler(signum, frame):
+- raise Exception("Signal handler executed!")
+-
+- yappi.start()
+- signal.signal(signal.SIGALRM, handler)
+- signal.alarm(1)
+- self.assertRaises(Exception, time.sleep, 2)
+- stats = yappi.get_func_stats()
+- fsh = utils.find_stat_by_name(stats, "handler")
+- self.assertTrue(fsh is not None)
+-
+- @unittest.skipIf(not sys.version_info >= (3, 2), "requires Python 3.2")
+- def test_concurrent_futures(self):
+- yappi.start()
+- from concurrent.futures import ThreadPoolExecutor
+- with ThreadPoolExecutor(max_workers=5) as executor:
+- f = executor.submit(pow, 5, 2)
+- self.assertEqual(f.result(), 25)
+- time.sleep(1.0)
+- yappi.stop()
+-
+- @unittest.skipIf(not sys.version_info >= (3, 2), "requires Python 3.2")
+- def test_barrier(self):
+- yappi.start()
+- b = threading.Barrier(2, timeout=1)
+-
+- def worker():
+- try:
+- b.wait()
+- except threading.BrokenBarrierError:
+- pass
+- except Exception:
+- raise Exception("BrokenBarrierError not raised")
+-
+- t1 = threading.Thread(target=worker)
+- t1.start()
+- #b.wait()
+- t1.join()
+- yappi.stop()
+-
+-
+-class NonRecursiveFunctions(utils.YappiUnitTestCase):
+-
+- def test_abcd(self):
+- _timings = {"a_1": 6, "b_1": 5, "c_1": 3, "d_1": 1}
+- _yappi._set_test_timings(_timings)
+-
+- def a():
+- b()
+-
+- def b():
+- c()
+-
+- def c():
+- d()
+-
+- def d():
+- pass
+-
+- stats = utils.run_and_get_func_stats(a)
+- fsa = utils.find_stat_by_name(stats, 'a')
+- fsb = utils.find_stat_by_name(stats, 'b')
+- fsc = utils.find_stat_by_name(stats, 'c')
+- fsd = utils.find_stat_by_name(stats, 'd')
+- cfsab = fsa.children[fsb]
+- cfsbc = fsb.children[fsc]
+- cfscd = fsc.children[fsd]
+-
+- self.assertEqual(fsa.ttot, 6)
+- self.assertEqual(fsa.tsub, 1)
+- self.assertEqual(fsb.ttot, 5)
+- self.assertEqual(fsb.tsub, 2)
+- self.assertEqual(fsc.ttot, 3)
+- self.assertEqual(fsc.tsub, 2)
+- self.assertEqual(fsd.ttot, 1)
+- self.assertEqual(fsd.tsub, 1)
+- self.assertEqual(cfsab.ttot, 5)
+- self.assertEqual(cfsab.tsub, 2)
+- self.assertEqual(cfsbc.ttot, 3)
+- self.assertEqual(cfsbc.tsub, 2)
+- self.assertEqual(cfscd.ttot, 1)
+- self.assertEqual(cfscd.tsub, 1)
+-
+- def test_stop_in_middle(self):
+- _timings = {"a_1": 6, "b_1": 4}
+- _yappi._set_test_timings(_timings)
+-
+- def a():
+- b()
+- yappi.stop()
+-
+- def b():
+- time.sleep(0.2)
+-
+- yappi.start()
+- a()
+- stats = yappi.get_func_stats()
+- fsa = utils.find_stat_by_name(stats, 'a')
+- fsb = utils.find_stat_by_name(stats, 'b')
+-
+- self.assertEqual(fsa.ncall, 1)
+- self.assertEqual(fsa.nactualcall, 0)
+- self.assertEqual(fsa.ttot, 0) # no call_leave called
+- self.assertEqual(fsa.tsub, 0) # no call_leave called
+- self.assertEqual(fsb.ttot, 4)
+-
+-
+-class RecursiveFunctions(utils.YappiUnitTestCase):
+-
+- def test_fibonacci(self):
+-
+- def fib(n):
+- if n > 1:
+- return fib(n - 1) + fib(n - 2)
+- else:
+- return n
+-
+- stats = utils.run_and_get_func_stats(fib, 22)
+- fs = utils.find_stat_by_name(stats, 'fib')
+- self.assertEqual(fs.ncall, 57313)
+- self.assertEqual(fs.ttot, fs.tsub)
+-
+- def test_abcadc(self):
+- _timings = {
+- "a_1": 20,
+- "b_1": 19,
+- "c_1": 17,
+- "a_2": 13,
+- "d_1": 12,
+- "c_2": 10,
+- "a_3": 5
+- }
+- _yappi._set_test_timings(_timings)
+-
+- def a(n):
+- if n == 3:
+- return
+- if n == 1 + 1:
+- d(n)
+- else:
+- b(n)
+-
+- def b(n):
+- c(n)
+-
+- def c(n):
+- a(n + 1)
+-
+- def d(n):
+- c(n)
+-
+- stats = utils.run_and_get_func_stats(a, 1)
+- fsa = utils.find_stat_by_name(stats, 'a')
+- fsb = utils.find_stat_by_name(stats, 'b')
+- fsc = utils.find_stat_by_name(stats, 'c')
+- fsd = utils.find_stat_by_name(stats, 'd')
+- self.assertEqual(fsa.ncall, 3)
+- self.assertEqual(fsa.nactualcall, 1)
+- self.assertEqual(fsa.ttot, 20)
+- self.assertEqual(fsa.tsub, 7)
+- self.assertEqual(fsb.ttot, 19)
+- self.assertEqual(fsb.tsub, 2)
+- self.assertEqual(fsc.ttot, 17)
+- self.assertEqual(fsc.tsub, 9)
+- self.assertEqual(fsd.ttot, 12)
+- self.assertEqual(fsd.tsub, 2)
+- cfsca = fsc.children[fsa]
+- self.assertEqual(cfsca.nactualcall, 0)
+- self.assertEqual(cfsca.ncall, 2)
+- self.assertEqual(cfsca.ttot, 13)
+- self.assertEqual(cfsca.tsub, 6)
+-
+- def test_aaaa(self):
+- _timings = {"d_1": 9, "d_2": 7, "d_3": 3, "d_4": 2}
+- _yappi._set_test_timings(_timings)
+-
+- def d(n):
+- if n == 3:
+- return
+- d(n + 1)
+-
+- stats = utils.run_and_get_func_stats(d, 0)
+- fsd = utils.find_stat_by_name(stats, 'd')
+- self.assertEqual(fsd.ncall, 4)
+- self.assertEqual(fsd.nactualcall, 1)
+- self.assertEqual(fsd.ttot, 9)
+- self.assertEqual(fsd.tsub, 9)
+- cfsdd = fsd.children[fsd]
+- self.assertEqual(cfsdd.ttot, 7)
+- self.assertEqual(cfsdd.tsub, 7)
+- self.assertEqual(cfsdd.ncall, 3)
+- self.assertEqual(cfsdd.nactualcall, 0)
+-
+- def test_abcabc(self):
+- _timings = {
+- "a_1": 20,
+- "b_1": 19,
+- "c_1": 17,
+- "a_2": 13,
+- "b_2": 11,
+- "c_2": 9,
+- "a_3": 6
+- }
+- _yappi._set_test_timings(_timings)
+-
+- def a(n):
+- if n == 3:
+- return
+- else:
+- b(n)
+-
+- def b(n):
+- c(n)
+-
+- def c(n):
+- a(n + 1)
+-
+- stats = utils.run_and_get_func_stats(a, 1)
+- fsa = utils.find_stat_by_name(stats, 'a')
+- fsb = utils.find_stat_by_name(stats, 'b')
+- fsc = utils.find_stat_by_name(stats, 'c')
+- self.assertEqual(fsa.ncall, 3)
+- self.assertEqual(fsa.nactualcall, 1)
+- self.assertEqual(fsa.ttot, 20)
+- self.assertEqual(fsa.tsub, 9)
+- self.assertEqual(fsb.ttot, 19)
+- self.assertEqual(fsb.tsub, 4)
+- self.assertEqual(fsc.ttot, 17)
+- self.assertEqual(fsc.tsub, 7)
+- cfsab = fsa.children[fsb]
+- cfsbc = fsb.children[fsc]
+- cfsca = fsc.children[fsa]
+- self.assertEqual(cfsab.ttot, 19)
+- self.assertEqual(cfsab.tsub, 4)
+- self.assertEqual(cfsbc.ttot, 17)
+- self.assertEqual(cfsbc.tsub, 7)
+- self.assertEqual(cfsca.ttot, 13)
+- self.assertEqual(cfsca.tsub, 8)
+-
+- def test_abcbca(self):
+- _timings = {"a_1": 10, "b_1": 9, "c_1": 7, "b_2": 4, "c_2": 2, "a_2": 1}
+- _yappi._set_test_timings(_timings)
+- self._ncall = 1
+-
+- def a():
+- if self._ncall == 1:
+- b()
+- else:
+- return
+-
+- def b():
+- c()
+-
+- def c():
+- if self._ncall == 1:
+- self._ncall += 1
+- b()
+- else:
+- a()
+-
+- stats = utils.run_and_get_func_stats(a)
+- fsa = utils.find_stat_by_name(stats, 'a')
+- fsb = utils.find_stat_by_name(stats, 'b')
+- fsc = utils.find_stat_by_name(stats, 'c')
+- cfsab = fsa.children[fsb]
+- cfsbc = fsb.children[fsc]
+- cfsca = fsc.children[fsa]
+- self.assertEqual(fsa.ttot, 10)
+- self.assertEqual(fsa.tsub, 2)
+- self.assertEqual(fsb.ttot, 9)
+- self.assertEqual(fsb.tsub, 4)
+- self.assertEqual(fsc.ttot, 7)
+- self.assertEqual(fsc.tsub, 4)
+- self.assertEqual(cfsab.ttot, 9)
+- self.assertEqual(cfsab.tsub, 2)
+- self.assertEqual(cfsbc.ttot, 7)
+- self.assertEqual(cfsbc.tsub, 4)
+- self.assertEqual(cfsca.ttot, 1)
+- self.assertEqual(cfsca.tsub, 1)
+- self.assertEqual(cfsca.ncall, 1)
+- self.assertEqual(cfsca.nactualcall, 0)
+-
+- def test_aabccb(self):
+- _timings = {
+- "a_1": 13,
+- "a_2": 11,
+- "b_1": 9,
+- "c_1": 5,
+- "c_2": 3,
+- "b_2": 1
+- }
+- _yappi._set_test_timings(_timings)
+- self._ncall = 1
+-
+- def a():
+- if self._ncall == 1:
+- self._ncall += 1
+- a()
+- else:
+- b()
+-
+- def b():
+- if self._ncall == 3:
+- return
+- else:
+- c()
+-
+- def c():
+- if self._ncall == 2:
+- self._ncall += 1
+- c()
+- else:
+- b()
+-
+- stats = utils.run_and_get_func_stats(a)
+- fsa = utils.find_stat_by_name(stats, 'a')
+- fsb = utils.find_stat_by_name(stats, 'b')
+- fsc = utils.find_stat_by_name(stats, 'c')
+- cfsaa = fsa.children[fsa.index]
+- cfsab = fsa.children[fsb]
+- cfsbc = fsb.children[fsc.full_name]
+- cfscc = fsc.children[fsc]
+- cfscb = fsc.children[fsb]
+- self.assertEqual(fsb.ttot, 9)
+- self.assertEqual(fsb.tsub, 5)
+- self.assertEqual(cfsbc.ttot, 5)
+- self.assertEqual(cfsbc.tsub, 2)
+- self.assertEqual(fsa.ttot, 13)
+- self.assertEqual(fsa.tsub, 4)
+- self.assertEqual(cfsab.ttot, 9)
+- self.assertEqual(cfsab.tsub, 4)
+- self.assertEqual(cfsaa.ttot, 11)
+- self.assertEqual(cfsaa.tsub, 2)
+- self.assertEqual(fsc.ttot, 5)
+- self.assertEqual(fsc.tsub, 4)
+-
+- def test_abaa(self):
+- _timings = {"a_1": 13, "b_1": 10, "a_2": 9, "a_3": 5}
+- _yappi._set_test_timings(_timings)
+-
+- self._ncall = 1
+-
+- def a():
+- if self._ncall == 1:
+- b()
+- elif self._ncall == 2:
+- self._ncall += 1
+- a()
+- else:
+- return
+-
+- def b():
+- self._ncall += 1
+- a()
+-
+- stats = utils.run_and_get_func_stats(a)
+- fsa = utils.find_stat_by_name(stats, 'a')
+- fsb = utils.find_stat_by_name(stats, 'b')
+- cfsaa = fsa.children[fsa]
+- cfsba = fsb.children[fsa]
+- self.assertEqual(fsb.ttot, 10)
+- self.assertEqual(fsb.tsub, 1)
+- self.assertEqual(fsa.ttot, 13)
+- self.assertEqual(fsa.tsub, 12)
+- self.assertEqual(cfsaa.ttot, 5)
+- self.assertEqual(cfsaa.tsub, 5)
+- self.assertEqual(cfsba.ttot, 9)
+- self.assertEqual(cfsba.tsub, 4)
+-
+- def test_aabb(self):
+- _timings = {"a_1": 13, "a_2": 10, "b_1": 9, "b_2": 5}
+- _yappi._set_test_timings(_timings)
+-
+- self._ncall = 1
+-
+- def a():
+- if self._ncall == 1:
+- self._ncall += 1
+- a()
+- elif self._ncall == 2:
+- b()
+- else:
+- return
+-
+- def b():
+- if self._ncall == 2:
+- self._ncall += 1
+- b()
+- else:
+- return
+-
+- stats = utils.run_and_get_func_stats(a)
+- fsa = utils.find_stat_by_name(stats, 'a')
+- fsb = utils.find_stat_by_name(stats, 'b')
+- cfsaa = fsa.children[fsa]
+- cfsab = fsa.children[fsb]
+- cfsbb = fsb.children[fsb]
+- self.assertEqual(fsa.ttot, 13)
+- self.assertEqual(fsa.tsub, 4)
+- self.assertEqual(fsb.ttot, 9)
+- self.assertEqual(fsb.tsub, 9)
+- self.assertEqual(cfsaa.ttot, 10)
+- self.assertEqual(cfsaa.tsub, 1)
+- self.assertEqual(cfsab.ttot, 9)
+- self.assertEqual(cfsab.tsub, 4)
+- self.assertEqual(cfsbb.ttot, 5)
+- self.assertEqual(cfsbb.tsub, 5)
+-
+- def test_abbb(self):
+- _timings = {"a_1": 13, "b_1": 10, "b_2": 6, "b_3": 1}
+- _yappi._set_test_timings(_timings)
+-
+- self._ncall = 1
+-
+- def a():
+- if self._ncall == 1:
+- b()
+-
+- def b():
+- if self._ncall == 3:
+- return
+- self._ncall += 1
+- b()
+-
+- stats = utils.run_and_get_func_stats(a)
+- fsa = utils.find_stat_by_name(stats, 'a')
+- fsb = utils.find_stat_by_name(stats, 'b')
+- cfsab = fsa.children[fsb]
+- cfsbb = fsb.children[fsb]
+- self.assertEqual(fsa.ttot, 13)
+- self.assertEqual(fsa.tsub, 3)
+- self.assertEqual(fsb.ttot, 10)
+- self.assertEqual(fsb.tsub, 10)
+- self.assertEqual(fsb.ncall, 3)
+- self.assertEqual(fsb.nactualcall, 1)
+- self.assertEqual(cfsab.ttot, 10)
+- self.assertEqual(cfsab.tsub, 4)
+- self.assertEqual(cfsbb.ttot, 6)
+- self.assertEqual(cfsbb.tsub, 6)
+- self.assertEqual(cfsbb.nactualcall, 0)
+- self.assertEqual(cfsbb.ncall, 2)
+-
+- def test_aaab(self):
+- _timings = {"a_1": 13, "a_2": 10, "a_3": 6, "b_1": 1}
+- _yappi._set_test_timings(_timings)
+-
+- self._ncall = 1
+-
+- def a():
+- if self._ncall == 3:
+- b()
+- return
+- self._ncall += 1
+- a()
+-
+- def b():
+- return
+-
+- stats = utils.run_and_get_func_stats(a)
+- fsa = utils.find_stat_by_name(stats, 'a')
+- fsb = utils.find_stat_by_name(stats, 'b')
+- cfsaa = fsa.children[fsa]
+- cfsab = fsa.children[fsb]
+- self.assertEqual(fsa.ttot, 13)
+- self.assertEqual(fsa.tsub, 12)
+- self.assertEqual(fsb.ttot, 1)
+- self.assertEqual(fsb.tsub, 1)
+- self.assertEqual(cfsaa.ttot, 10)
+- self.assertEqual(cfsaa.tsub, 9)
+- self.assertEqual(cfsab.ttot, 1)
+- self.assertEqual(cfsab.tsub, 1)
+-
+- def test_abab(self):
+- _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1}
+- _yappi._set_test_timings(_timings)
+-
+- self._ncall = 1
+-
+- def a():
+- b()
+-
+- def b():
+- if self._ncall == 2:
+- return
+- self._ncall += 1
+- a()
+-
+- stats = utils.run_and_get_func_stats(a)
+- fsa = utils.find_stat_by_name(stats, 'a')
+- fsb = utils.find_stat_by_name(stats, 'b')
+- cfsab = fsa.children[fsb]
+- cfsba = fsb.children[fsa]
+- self.assertEqual(fsa.ttot, 13)
+- self.assertEqual(fsa.tsub, 8)
+- self.assertEqual(fsb.ttot, 10)
+- self.assertEqual(fsb.tsub, 5)
+- self.assertEqual(cfsab.ttot, 10)
+- self.assertEqual(cfsab.tsub, 5)
+- self.assertEqual(cfsab.ncall, 2)
+- self.assertEqual(cfsab.nactualcall, 1)
+- self.assertEqual(cfsba.ttot, 6)
+- self.assertEqual(cfsba.tsub, 5)
+-
+-
+-if __name__ == '__main__':
+- # import sys;sys.argv = ['', 'BasicUsage.test_run_as_script']
+- # import sys;sys.argv = ['', 'MultithreadedScenarios.test_subsequent_profile']
+- unittest.main()
++import os
++import sys
++import time
++import threading
++import unittest
++import yappi
++import _yappi
+import tests.utils as utils
- import multiprocessing # added to fix http://bugs.python.org/issue15881 for > Py2.6
- import subprocess
-
-diff --git a/tests/test_gevent.py b/tests/test_gevent.py
-index 8569712..fe15b29 100644
++import multiprocessing # added to fix http://bugs.python.org/issue15881 for > Py2.6
++import subprocess
++
++_counter = 0
++
++
++class BasicUsage(utils.YappiUnitTestCase):
++
++ def test_callback_function_int_return_overflow(self):
++ # this test is just here to check if any errors are generated, as the err
++ # is printed in C side, I did not include it here. THere are ways to test
++ # this deterministically, I did not bother
++ import ctypes
++
++ def _unsigned_overflow_margin():
++ return 2**(ctypes.sizeof(ctypes.c_void_p) * 8) - 1
++
++ def foo():
++ pass
++
++ #with utils.captured_output() as (out, err):
++ yappi.set_context_id_callback(_unsigned_overflow_margin)
++ yappi.set_tag_callback(_unsigned_overflow_margin)
++ yappi.start()
++ foo()
++
++ def test_issue60(self):
++
++ def foo():
++ buf = bytearray()
++ buf += b't' * 200
++ view = memoryview(buf)[10:]
++ view = view.tobytes()
++ del buf[:10] # this throws exception
++ return view
++
++ yappi.start(builtins=True)
++ foo()
++ self.assertTrue(
++ len(
++ yappi.get_func_stats(
++ filter_callback=lambda x: yappi.
++ func_matches(x, [memoryview.tobytes])
++ )
++ ) > 0
++ )
++ yappi.stop()
++
++ def test_issue54(self):
++
++ def _tag_cbk():
++ global _counter
++ _counter += 1
++ return _counter
++
++ def a():
++ pass
++
++ def b():
++ pass
++
++ yappi.set_tag_callback(_tag_cbk)
++ yappi.start()
++ a()
++ a()
++ a()
++ yappi.stop()
++ stats = yappi.get_func_stats()
++ self.assertEqual(stats.pop().ncall, 3) # aggregated if no tag is given
++ stats = yappi.get_func_stats(tag=1)
++
++ for i in range(1, 3):
++ stats = yappi.get_func_stats(tag=i)
++ stats = yappi.get_func_stats(
++ tag=i, filter_callback=lambda x: yappi.func_matches(x, [a])
++ )
++
++ stat = stats.pop()
++ self.assertEqual(stat.ncall, 1)
++
++ yappi.set_tag_callback(None)
++ yappi.clear_stats()
++ yappi.start()
++ b()
++ b()
++ stats = yappi.get_func_stats()
++ self.assertEqual(len(stats), 1)
++ stat = stats.pop()
++ self.assertEqual(stat.ncall, 2)
++
++ def test_filter(self):
++
++ def a():
++ pass
++
++ def b():
++ a()
++
++ def c():
++ b()
++
++ _TCOUNT = 5
++
++ ts = []
++ yappi.start()
++ for i in range(_TCOUNT):
++ t = threading.Thread(target=c)
++ t.start()
++ ts.append(t)
++
++ for t in ts:
++ t.join()
++
++ yappi.stop()
++
++ ctx_ids = []
++ for tstat in yappi.get_thread_stats():
++ if tstat.name == '_MainThread':
++ main_ctx_id = tstat.id
++ else:
++ ctx_ids.append(tstat.id)
++
++ fstats = yappi.get_func_stats(filter={"ctx_id": 9})
++ self.assertTrue(fstats.empty())
++ fstats = yappi.get_func_stats(
++ filter={
++ "ctx_id": main_ctx_id,
++ "name": "c"
++ }
++ ) # main thread
++ self.assertTrue(fstats.empty())
++
++ for i in ctx_ids:
++ fstats = yappi.get_func_stats(
++ filter={
++ "ctx_id": i,
++ "name": "a",
++ "ncall": 1
++ }
++ )
++ self.assertEqual(fstats.pop().ncall, 1)
++ fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "b"})
++ self.assertEqual(fstats.pop().ncall, 1)
++ fstats = yappi.get_func_stats(filter={"ctx_id": i, "name": "c"})
++ self.assertEqual(fstats.pop().ncall, 1)
++
++ yappi.clear_stats()
++ yappi.start(builtins=True)
++ time.sleep(0.1)
++ yappi.stop()
++ fstats = yappi.get_func_stats(filter={"module": "time"})
++ self.assertEqual(len(fstats), 1)
++
++ # invalid filters`
++ self.assertRaises(
++ Exception, yappi.get_func_stats, filter={'tag': "sss"}
++ )
++ self.assertRaises(
++ Exception, yappi.get_func_stats, filter={'ctx_id': "None"}
++ )
++
++ def test_filter_callback(self):
++
++ def a():
++ time.sleep(0.1)
++
++ def b():
++ a()
++
++ def c():
++ pass
++
++ def d():
++ pass
++
++ yappi.set_clock_type("wall")
++ yappi.start(builtins=True)
++ a()
++ b()
++ c()
++ d()
++ stats = yappi.get_func_stats(
++ filter_callback=lambda x: yappi.func_matches(x, [a, b])
++ )
++ #stats.print_all()
++ r1 = '''
++ tests/test_functionality.py:98 a 2 0.000000 0.200350 0.100175
++ tests/test_functionality.py:101 b 1 0.000000 0.120000 0.100197
++ '''
++ self.assert_traces_almost_equal(r1, stats)
++ self.assertEqual(len(stats), 2)
++ stats = yappi.get_func_stats(
++ filter_callback=lambda x: yappi.
++ module_matches(x, [sys.modules[__name__]])
++ )
++ r1 = '''
++ tests/test_functionality.py:98 a 2 0.000000 0.230130 0.115065
++ tests/test_functionality.py:101 b 1 0.000000 0.120000 0.109011
++ tests/test_functionality.py:104 c 1 0.000000 0.000002 0.000002
++ tests/test_functionality.py:107 d 1 0.000000 0.000001 0.000001
++ '''
++ self.assert_traces_almost_equal(r1, stats)
++ self.assertEqual(len(stats), 4)
++
++ stats = yappi.get_func_stats(
++ filter_callback=lambda x: yappi.func_matches(x, [time.sleep])
++ )
++ self.assertEqual(len(stats), 1)
++ r1 = '''
++ time.sleep 2 0.206804 0.220000 0.103402
++ '''
++ self.assert_traces_almost_equal(r1, stats)
++
++ def test_print_formatting(self):
++
++ def a():
++ pass
++
++ def b():
++ a()
++
++ func_cols = {
++ 1: ("name", 48),
++ 0: ("ncall", 5),
++ 2: ("tsub", 8),
++ }
++ thread_cols = {
++ 1: ("name", 48),
++ 0: ("ttot", 8),
++ }
++
++ yappi.start()
++ a()
++ b()
++ yappi.stop()
++ fs = yappi.get_func_stats()
++ cs = fs[1].children
++ ts = yappi.get_thread_stats()
++ #fs.print_all(out=sys.stderr, columns={1:("name", 70), })
++ #cs.print_all(out=sys.stderr, columns=func_cols)
++ #ts.print_all(out=sys.stderr, columns=thread_cols)
++ #cs.print_all(out=sys.stderr, columns={})
++
++ self.assertRaises(
++ yappi.YappiError, fs.print_all, columns={1: ("namee", 9)}
++ )
++ self.assertRaises(
++ yappi.YappiError, cs.print_all, columns={1: ("dd", 0)}
++ )
++ self.assertRaises(
++ yappi.YappiError, ts.print_all, columns={1: ("tidd", 0)}
++ )
++
++ def test_get_clock(self):
++ yappi.set_clock_type('cpu')
++ self.assertEqual('cpu', yappi.get_clock_type())
++ clock_info = yappi.get_clock_info()
++ self.assertTrue('api' in clock_info)
++ self.assertTrue('resolution' in clock_info)
++
++ yappi.set_clock_type('wall')
++ self.assertEqual('wall', yappi.get_clock_type())
++
++ t0 = yappi.get_clock_time()
++ time.sleep(0.1)
++ duration = yappi.get_clock_time() - t0
++ self.assertTrue(0.05 < duration < 0.3)
++
++ def test_profile_decorator(self):
++
++ def aggregate(func, stats):
++ fname = "tests/%s.profile" % (func.__name__)
++ try:
++ stats.add(fname)
++ except IOError:
++ pass
++ stats.save(fname)
++ raise Exception("messing around")
++
++ @yappi.profile(return_callback=aggregate)
++ def a(x, y):
++ if x + y == 25:
++ raise Exception("")
++ return x + y
++
++ def b():
++ pass
++
++ try:
++ os.remove(
++ "tests/a.profile"
++ ) # remove the one from prev test, if available
++ except:
++ pass
++
++ # global profile is on to mess things up
++ yappi.start()
++ b()
++
++ # assert functionality and call function at same time
++ try:
++ self.assertEqual(a(1, 2), 3)
++ except:
++ pass
++ try:
++ self.assertEqual(a(2, 5), 7)
++ except:
++ pass
++ try:
++ a(4, 21)
++ except:
++ pass
++ stats = yappi.get_func_stats().add("tests/a.profile")
++ fsa = utils.find_stat_by_name(stats, 'a')
++ self.assertEqual(fsa.ncall, 3)
++ self.assertEqual(len(stats), 1) # b() should be cleared out.
++
++ @yappi.profile(return_callback=aggregate)
++ def count_down_rec(n):
++ if n == 0:
++ return
++ count_down_rec(n - 1)
++
++ try:
++ os.remove(
++ "tests/count_down_rec.profile"
++ ) # remove the one from prev test, if available
++ except:
++ pass
++
++ try:
++ count_down_rec(4)
++ except:
++ pass
++ try:
++ count_down_rec(3)
++ except:
++ pass
++
++ stats = yappi.YFuncStats("tests/count_down_rec.profile")
++ fsrec = utils.find_stat_by_name(stats, 'count_down_rec')
++ self.assertEqual(fsrec.ncall, 9)
++ self.assertEqual(fsrec.nactualcall, 2)
++
++ def test_strip_dirs(self):
++
++ def a():
++ pass
++
++ stats = utils.run_and_get_func_stats(a, )
++ stats.strip_dirs()
++ fsa = utils.find_stat_by_name(stats, "a")
++ self.assertEqual(fsa.module, os.path.basename(fsa.module))
++
++ @unittest.skipIf(os.name == "nt", "do not run on Windows")
++ def test_run_as_script(self):
++ import re
++ p = subprocess.Popen(
++ ['yappi', os.path.join('./tests', 'run_as_script.py')],
++ stdout=subprocess.PIPE
++ )
++ out, err = p.communicate()
++ self.assertEqual(p.returncode, 0)
++ func_stats, thread_stats = re.split(
++ b'name\\s+id\\s+tid\\s+ttot\\s+scnt\\s*\n', out
++ )
++ self.assertTrue(b'FancyThread' in thread_stats)
++
++ def test_yappi_overhead(self):
++ LOOP_COUNT = 100000
++
++ def a():
++ pass
++
++ def b():
++ for i in range(LOOP_COUNT):
++ a()
++
++ t0 = time.time()
++ yappi.start()
++ b()
++ yappi.stop()
++ time_with_yappi = time.time() - t0
++ t0 = time.time()
++ b()
++ time_without_yappi = time.time() - t0
++ if time_without_yappi == 0:
++ time_without_yappi = 0.000001
++
++ # in latest v0.82, I calculated this as close to "7.0" in my machine.
++ # however, %83 of this overhead is coming from tickcount(). The other %17
++ # seems to have been evenly distributed to the internal bookkeeping
++ # structures/algorithms which seems acceptable. Note that our test only
++ # tests one function being profiled at-a-time in a short interval.
++ # profiling high number of functions in a small time
++ # is a different beast, (which is pretty unlikely in most applications)
++ # So as a conclusion: I cannot see any optimization window for Yappi that
++ # is worth implementing as we will only optimize %17 of the time.
++ sys.stderr.write("\r\nYappi puts %0.1f times overhead to the profiled application in average.\r\n" % \
++ (time_with_yappi / time_without_yappi))
++
++ def test_clear_stats_while_running(self):
++
++ def a():
++ pass
++
++ yappi.start()
++ a()
++ yappi.clear_stats()
++ a()
++ stats = yappi.get_func_stats()
++ fsa = utils.find_stat_by_name(stats, 'a')
++ self.assertEqual(fsa.ncall, 1)
++
++ def test_generator(self):
++
++ def _gen(n):
++ while (n > 0):
++ yield n
++ n -= 1
++
++ yappi.start()
++ for x in _gen(5):
++ pass
++ self.assertTrue(
++ yappi.convert2pstats(yappi.get_func_stats()) is not None
++ )
++
++ def test_slice_child_stats_and_strip_dirs(self):
++
++ def b():
++ for i in range(10000000):
++ pass
++
++ def a():
++ b()
++
++ yappi.start(builtins=True)
++ a()
++ stats = yappi.get_func_stats()
++ fsa = utils.find_stat_by_name(stats, 'a')
++ fsb = utils.find_stat_by_name(stats, 'b')
++ self.assertTrue(fsa.children[0:1] is not None)
++ prev_afullname = fsa.full_name
++ prev_bchildfullname = fsa.children[fsb].full_name
++ stats.strip_dirs()
++ self.assertTrue(len(prev_afullname) > len(fsa.full_name))
++ self.assertTrue(
++ len(prev_bchildfullname) > len(fsa.children[fsb].full_name)
++ )
++
++ def test_children_stat_functions(self):
++ _timings = {"a_1": 5, "b_1": 3, "c_1": 1}
++ _yappi._set_test_timings(_timings)
++
++ def b():
++ pass
++
++ def c():
++ pass
++
++ def a():
++ b()
++ c()
++
++ yappi.start()
++ a()
++ b() # non-child call
++ c() # non-child call
++ stats = yappi.get_func_stats()
++ fsa = utils.find_stat_by_name(stats, 'a')
++ childs_of_a = fsa.children.get().sort("tavg", "desc")
++ prev_item = None
++ for item in childs_of_a:
++ if prev_item:
++ self.assertTrue(prev_item.tavg > item.tavg)
++ prev_item = item
++ childs_of_a.sort("name", "desc")
++ prev_item = None
++ for item in childs_of_a:
++ if prev_item:
++ self.assertTrue(prev_item.name > item.name)
++ prev_item = item
++ childs_of_a.clear()
++ self.assertTrue(childs_of_a.empty())
++
++ def test_no_stats_different_clock_type_load(self):
++
++ def a():
++ pass
++
++ yappi.start()
++ a()
++ yappi.stop()
++ yappi.get_func_stats().save("tests/ystats1.ys")
++ yappi.clear_stats()
++ yappi.set_clock_type("WALL")
++ yappi.start()
++ yappi.stop()
++ stats = yappi.get_func_stats().add("tests/ystats1.ys")
++ fsa = utils.find_stat_by_name(stats, 'a')
++ self.assertTrue(fsa is not None)
++
++ def test_subsequent_profile(self):
++ _timings = {"a_1": 1, "b_1": 1}
++ _yappi._set_test_timings(_timings)
++
++ def a():
++ pass
++
++ def b():
++ pass
++
++ yappi.start()
++ a()
++ yappi.stop()
++ yappi.start()
++ b()
++ yappi.stop()
++ stats = yappi.get_func_stats()
++ fsa = utils.find_stat_by_name(stats, 'a')
++ fsb = utils.find_stat_by_name(stats, 'b')
++ self.assertTrue(fsa is not None)
++ self.assertTrue(fsb is not None)
++ self.assertEqual(fsa.ttot, 1)
++ self.assertEqual(fsb.ttot, 1)
++
++ def test_lambda(self):
++ f = lambda: time.sleep(0.3)
++ yappi.set_clock_type("wall")
++ yappi.start()
++ f()
++ stats = yappi.get_func_stats()
++ fsa = utils.find_stat_by_name(stats, '<lambda>')
++ self.assertTrue(fsa.ttot > 0.1)
++
++ def test_module_stress(self):
++ self.assertEqual(yappi.is_running(), False)
++
++ yappi.start()
++ yappi.clear_stats()
++ self.assertRaises(_yappi.error, yappi.set_clock_type, "wall")
++
++ yappi.stop()
++ yappi.clear_stats()
++ yappi.set_clock_type("cpu")
++ self.assertRaises(yappi.YappiError, yappi.set_clock_type, "dummy")
++ self.assertEqual(yappi.is_running(), False)
++ yappi.clear_stats()
++ yappi.clear_stats()
++
++ def test_stat_sorting(self):
++ _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1}
++ _yappi._set_test_timings(_timings)
++
++ self._ncall = 1
++
++ def a():
++ b()
++
++ def b():
++ if self._ncall == 2:
++ return
++ self._ncall += 1
++ a()
++
++ stats = utils.run_and_get_func_stats(a)
++ stats = stats.sort("totaltime", "desc")
++ prev_stat = None
++ for stat in stats:
++ if prev_stat:
++ self.assertTrue(prev_stat.ttot >= stat.ttot)
++ prev_stat = stat
++ stats = stats.sort("totaltime", "asc")
++ prev_stat = None
++ for stat in stats:
++ if prev_stat:
++ self.assertTrue(prev_stat.ttot <= stat.ttot)
++ prev_stat = stat
++ stats = stats.sort("avgtime", "asc")
++ prev_stat = None
++ for stat in stats:
++ if prev_stat:
++ self.assertTrue(prev_stat.tavg <= stat.tavg)
++ prev_stat = stat
++ stats = stats.sort("name", "asc")
++ prev_stat = None
++ for stat in stats:
++ if prev_stat:
++ self.assertTrue(prev_stat.name <= stat.name)
++ prev_stat = stat
++ stats = stats.sort("subtime", "asc")
++ prev_stat = None
++ for stat in stats:
++ if prev_stat:
++ self.assertTrue(prev_stat.tsub <= stat.tsub)
++ prev_stat = stat
++
++ self.assertRaises(
++ yappi.YappiError, stats.sort, "invalid_func_sorttype_arg"
++ )
++ self.assertRaises(
++ yappi.YappiError, stats.sort, "totaltime",
++ "invalid_func_sortorder_arg"
++ )
++
++ def test_start_flags(self):
++ self.assertEqual(_yappi._get_start_flags(), None)
++ yappi.start()
++
++ def a():
++ pass
++
++ a()
++ self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0)
++ self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1)
++ self.assertEqual(len(yappi.get_thread_stats()), 1)
++
++ def test_builtin_profiling(self):
++
++ def a():
++ time.sleep(0.4) # is a builtin function
++
++ yappi.set_clock_type('wall')
++
++ yappi.start(builtins=True)
++ a()
++ stats = yappi.get_func_stats()
++ fsa = utils.find_stat_by_name(stats, 'sleep')
++ self.assertTrue(fsa is not None)
++ self.assertTrue(fsa.ttot > 0.3)
++ yappi.stop()
++ yappi.clear_stats()
++
++ def a():
++ pass
++
++ yappi.start()
++ t = threading.Thread(target=a)
++ t.start()
++ t.join()
++ stats = yappi.get_func_stats()
++
++ def test_singlethread_profiling(self):
++ yappi.set_clock_type('wall')
++
++ def a():
++ time.sleep(0.2)
++
++ class Worker1(threading.Thread):
++
++ def a(self):
++ time.sleep(0.3)
++
++ def run(self):
++ self.a()
++
++ yappi.start(profile_threads=False)
++
++ c = Worker1()
++ c.start()
++ c.join()
++ a()
++ stats = yappi.get_func_stats()
++ fsa1 = utils.find_stat_by_name(stats, 'Worker1.a')
++ fsa2 = utils.find_stat_by_name(stats, 'a')
++ self.assertTrue(fsa1 is None)
++ self.assertTrue(fsa2 is not None)
++ self.assertTrue(fsa2.ttot > 0.1)
++
++ def test_run(self):
++
++ def profiled():
++ pass
++
++ yappi.clear_stats()
++ try:
++ with yappi.run():
++ profiled()
++ stats = yappi.get_func_stats()
++ finally:
++ yappi.clear_stats()
++
++ self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled'))
++
++ def test_run_recursive(self):
++
++ def profiled():
++ pass
++
++ def not_profiled():
++ pass
++
++ yappi.clear_stats()
++ try:
++ with yappi.run():
++ with yappi.run():
++ profiled()
++ # Profiling stopped here
++ not_profiled()
++ stats = yappi.get_func_stats()
++ finally:
++ yappi.clear_stats()
++
++ self.assertIsNotNone(utils.find_stat_by_name(stats, 'profiled'))
++ self.assertIsNone(utils.find_stat_by_name(stats, 'not_profiled'))
++
++
++class StatSaveScenarios(utils.YappiUnitTestCase):
++
++ def test_pstats_conversion(self):
++
++ def pstat_id(fs):
++ return (fs.module, fs.lineno, fs.name)
++
++ def a():
++ d()
++
++ def b():
++ d()
++
++ def c():
++ pass
++
++ def d():
++ pass
++
++ _timings = {"a_1": 12, "b_1": 7, "c_1": 5, "d_1": 2}
++ _yappi._set_test_timings(_timings)
++ stats = utils.run_and_get_func_stats(a, )
++ stats.strip_dirs()
++ stats.save("tests/a1.pstats", type="pstat")
++ fsa_pid = pstat_id(utils.find_stat_by_name(stats, "a"))
++ fsd_pid = pstat_id(utils.find_stat_by_name(stats, "d"))
++ yappi.clear_stats()
++ _yappi._set_test_timings(_timings)
++ stats = utils.run_and_get_func_stats(a, )
++ stats.strip_dirs()
++ stats.save("tests/a2.pstats", type="pstat")
++ yappi.clear_stats()
++ _yappi._set_test_timings(_timings)
++ stats = utils.run_and_get_func_stats(b, )
++ stats.strip_dirs()
++ stats.save("tests/b1.pstats", type="pstat")
++ fsb_pid = pstat_id(utils.find_stat_by_name(stats, "b"))
++ yappi.clear_stats()
++ _yappi._set_test_timings(_timings)
++ stats = utils.run_and_get_func_stats(c, )
++ stats.strip_dirs()
++ stats.save("tests/c1.pstats", type="pstat")
++ fsc_pid = pstat_id(utils.find_stat_by_name(stats, "c"))
++
++ # merge saved stats and check pstats values are correct
++ import pstats
++ p = pstats.Stats(
++ 'tests/a1.pstats', 'tests/a2.pstats', 'tests/b1.pstats',
++ 'tests/c1.pstats'
++ )
++ p.strip_dirs()
++ # ct = ttot, tt = tsub
++ (cc, nc, tt, ct, callers) = p.stats[fsa_pid]
++ self.assertEqual(cc, nc, 2)
++ self.assertEqual(tt, 20)
++ self.assertEqual(ct, 24)
++ (cc, nc, tt, ct, callers) = p.stats[fsd_pid]
++ self.assertEqual(cc, nc, 3)
++ self.assertEqual(tt, 6)
++ self.assertEqual(ct, 6)
++ self.assertEqual(len(callers), 2)
++ (cc, nc, tt, ct) = callers[fsa_pid]
++ self.assertEqual(cc, nc, 2)
++ self.assertEqual(tt, 4)
++ self.assertEqual(ct, 4)
++ (cc, nc, tt, ct) = callers[fsb_pid]
++ self.assertEqual(cc, nc, 1)
++ self.assertEqual(tt, 2)
++ self.assertEqual(ct, 2)
++
++ def test_merge_stats(self):
++ _timings = {
++ "a_1": 15,
++ "b_1": 14,
++ "c_1": 12,
++ "d_1": 10,
++ "e_1": 9,
++ "f_1": 7,
++ "g_1": 6,
++ "h_1": 5,
++ "i_1": 1
++ }
++ _yappi._set_test_timings(_timings)
++
++ def a():
++ b()
++
++ def b():
++ c()
++
++ def c():
++ d()
++
++ def d():
++ e()
++
++ def e():
++ f()
++
++ def f():
++ g()
++
++ def g():
++ h()
++
++ def h():
++ i()
++
++ def i():
++ pass
++
++ yappi.start()
++ a()
++ a()
++ yappi.stop()
++ stats = yappi.get_func_stats()
++ self.assertRaises(
++ NotImplementedError, stats.save, "", "INVALID_SAVE_TYPE"
++ )
++ stats.save("tests/ystats2.ys")
++ yappi.clear_stats()
++ _yappi._set_test_timings(_timings)
++ yappi.start()
++ a()
++ stats = yappi.get_func_stats().add("tests/ystats2.ys")
++ fsa = utils.find_stat_by_name(stats, "a")
++ fsb = utils.find_stat_by_name(stats, "b")
++ fsc = utils.find_stat_by_name(stats, "c")
++ fsd = utils.find_stat_by_name(stats, "d")
++ fse = utils.find_stat_by_name(stats, "e")
++ fsf = utils.find_stat_by_name(stats, "f")
++ fsg = utils.find_stat_by_name(stats, "g")
++ fsh = utils.find_stat_by_name(stats, "h")
++ fsi = utils.find_stat_by_name(stats, "i")
++ self.assertEqual(fsa.ttot, 45)
++ self.assertEqual(fsa.ncall, 3)
++ self.assertEqual(fsa.nactualcall, 3)
++ self.assertEqual(fsa.tsub, 3)
++ self.assertEqual(fsa.children[fsb].ttot, fsb.ttot)
++ self.assertEqual(fsa.children[fsb].tsub, fsb.tsub)
++ self.assertEqual(fsb.children[fsc].ttot, fsc.ttot)
++ self.assertEqual(fsb.children[fsc].tsub, fsc.tsub)
++ self.assertEqual(fsc.tsub, 6)
++ self.assertEqual(fsc.children[fsd].ttot, fsd.ttot)
++ self.assertEqual(fsc.children[fsd].tsub, fsd.tsub)
++ self.assertEqual(fsd.children[fse].ttot, fse.ttot)
++ self.assertEqual(fsd.children[fse].tsub, fse.tsub)
++ self.assertEqual(fse.children[fsf].ttot, fsf.ttot)
++ self.assertEqual(fse.children[fsf].tsub, fsf.tsub)
++ self.assertEqual(fsf.children[fsg].ttot, fsg.ttot)
++ self.assertEqual(fsf.children[fsg].tsub, fsg.tsub)
++ self.assertEqual(fsg.ttot, 18)
++ self.assertEqual(fsg.tsub, 3)
++ self.assertEqual(fsg.children[fsh].ttot, fsh.ttot)
++ self.assertEqual(fsg.children[fsh].tsub, fsh.tsub)
++ self.assertEqual(fsh.ttot, 15)
++ self.assertEqual(fsh.tsub, 12)
++ self.assertEqual(fsh.tavg, 5)
++ self.assertEqual(fsh.children[fsi].ttot, fsi.ttot)
++ self.assertEqual(fsh.children[fsi].tsub, fsi.tsub)
++ #stats.debug_print()
++
++ def test_merge_multithreaded_stats(self):
++ import _yappi
++ timings = {"a_1": 2, "b_1": 1}
++ _yappi._set_test_timings(timings)
++
++ def a():
++ pass
++
++ def b():
++ pass
++
++ yappi.start()
++ t = threading.Thread(target=a)
++ t.start()
++ t.join()
++ t = threading.Thread(target=b)
++ t.start()
++ t.join()
++ yappi.get_func_stats().save("tests/ystats1.ys")
++ yappi.clear_stats()
++ _yappi._set_test_timings(timings)
++ self.assertEqual(len(yappi.get_func_stats()), 0)
++ self.assertEqual(len(yappi.get_thread_stats()), 1)
++ t = threading.Thread(target=a)
++ t.start()
++ t.join()
++
++ self.assertEqual(_yappi._get_start_flags()["profile_builtins"], 0)
++ self.assertEqual(_yappi._get_start_flags()["profile_multicontext"], 1)
++ yappi.get_func_stats().save("tests/ystats2.ys")
++
++ stats = yappi.YFuncStats([
++ "tests/ystats1.ys",
++ "tests/ystats2.ys",
++ ])
++ fsa = utils.find_stat_by_name(stats, "a")
++ fsb = utils.find_stat_by_name(stats, "b")
++ self.assertEqual(fsa.ncall, 2)
++ self.assertEqual(fsb.ncall, 1)
++ self.assertEqual(fsa.tsub, fsa.ttot, 4)
++ self.assertEqual(fsb.tsub, fsb.ttot, 1)
++
++ def test_merge_load_different_clock_types(self):
++ yappi.start(builtins=True)
++
++ def a():
++ b()
++
++ def b():
++ c()
++
++ def c():
++ pass
++
++ t = threading.Thread(target=a)
++ t.start()
++ t.join()
++ yappi.get_func_stats().sort("name", "asc").save("tests/ystats1.ys")
++ yappi.stop()
++ yappi.clear_stats()
++ yappi.start(builtins=False)
++ t = threading.Thread(target=a)
++ t.start()
++ t.join()
++ yappi.get_func_stats().save("tests/ystats2.ys")
++ yappi.stop()
++ self.assertRaises(_yappi.error, yappi.set_clock_type, "wall")
++ yappi.clear_stats()
++ yappi.set_clock_type("wall")
++ yappi.start()
++ t = threading.Thread(target=a)
++ t.start()
++ t.join()
++ yappi.get_func_stats().save("tests/ystats3.ys")
++ self.assertRaises(
++ yappi.YappiError,
++ yappi.YFuncStats().add("tests/ystats1.ys").add, "tests/ystats3.ys"
++ )
++ stats = yappi.YFuncStats(["tests/ystats1.ys",
++ "tests/ystats2.ys"]).sort("name")
++ fsa = utils.find_stat_by_name(stats, "a")
++ fsb = utils.find_stat_by_name(stats, "b")
++ fsc = utils.find_stat_by_name(stats, "c")
++ self.assertEqual(fsa.ncall, 2)
++ self.assertEqual(fsa.ncall, fsb.ncall, fsc.ncall)
++
++ def test_merge_aabab_aabbc(self):
++ _timings = {
++ "a_1": 15,
++ "a_2": 14,
++ "b_1": 12,
++ "a_3": 10,
++ "b_2": 9,
++ "c_1": 4
++ }
++ _yappi._set_test_timings(_timings)
++
++ def a():
++ if self._ncall == 1:
++ self._ncall += 1
++ a()
++ elif self._ncall == 5:
++ self._ncall += 1
++ a()
++ else:
++ b()
++
++ def b():
++ if self._ncall == 2:
++ self._ncall += 1
++ a()
++ elif self._ncall == 6:
++ self._ncall += 1
++ b()
++ elif self._ncall == 7:
++ c()
++ else:
++ return
++
++ def c():
++ pass
++
++ self._ncall = 1
++ stats = utils.run_and_get_func_stats(a, )
++ stats.save("tests/ystats1.ys")
++ yappi.clear_stats()
++ _yappi._set_test_timings(_timings)
++ #stats.print_all()
++
++ self._ncall = 5
++ stats = utils.run_and_get_func_stats(a, )
++ stats.save("tests/ystats2.ys")
++
++ #stats.print_all()
++
++ def a(): # same name but another function(code object)
++ pass
++
++ yappi.start()
++ a()
++ stats = yappi.get_func_stats().add(
++ ["tests/ystats1.ys", "tests/ystats2.ys"]
++ )
++ #stats.print_all()
++ self.assertEqual(len(stats), 4)
++
++ fsa = None
++ for stat in stats:
++ if stat.name == "a" and stat.ttot == 45:
++ fsa = stat
++ break
++ self.assertTrue(fsa is not None)
++
++ self.assertEqual(fsa.ncall, 7)
++ self.assertEqual(fsa.nactualcall, 3)
++ self.assertEqual(fsa.ttot, 45)
++ self.assertEqual(fsa.tsub, 10)
++ fsb = utils.find_stat_by_name(stats, "b")
++ fsc = utils.find_stat_by_name(stats, "c")
++ self.assertEqual(fsb.ncall, 6)
++ self.assertEqual(fsb.nactualcall, 3)
++ self.assertEqual(fsb.ttot, 36)
++ self.assertEqual(fsb.tsub, 27)
++ self.assertEqual(fsb.tavg, 6)
++ self.assertEqual(fsc.ttot, 8)
++ self.assertEqual(fsc.tsub, 8)
++ self.assertEqual(fsc.tavg, 4)
++ self.assertEqual(fsc.nactualcall, fsc.ncall, 2)
++
++
++class MultithreadedScenarios(utils.YappiUnitTestCase):
++
++ def test_issue_32(self):
++ '''
++ Start yappi from different thread and we get Internal Error(15) as
++ the current_ctx_id() called while enumerating the threads in start()
++ and as it does not swap to the enumerated ThreadState* the THreadState_GetDict()
++ returns wrong object and thus sets an invalid id for the _ctx structure.
++
++ When this issue happens multiple Threads have same tid as the internal ts_ptr
++ will be same for different contexts. So, let's see if that happens
++ '''
++
++ def foo():
++ time.sleep(0.2)
++
++ def bar():
++ time.sleep(0.1)
++
++ def thread_func():
++ yappi.set_clock_type("wall")
++ yappi.start()
++
++ bar()
++
++ t = threading.Thread(target=thread_func)
++ t.start()
++ t.join()
++
++ foo()
++
++ yappi.stop()
++
++ thread_ids = set()
++ for tstat in yappi.get_thread_stats():
++ self.assertTrue(tstat.tid not in thread_ids)
++ thread_ids.add(tstat.tid)
++
++ def test_subsequent_profile(self):
++ WORKER_COUNT = 5
++
++ def a():
++ pass
++
++ def b():
++ pass
++
++ def c():
++ pass
++
++ _timings = {
++ "a_1": 3,
++ "b_1": 2,
++ "c_1": 1,
++ }
++
++ yappi.start()
++
++ def g():
++ pass
++
++ g()
++ yappi.stop()
++ yappi.clear_stats()
++ _yappi._set_test_timings(_timings)
++ yappi.start()
++
++ _dummy = []
++ for i in range(WORKER_COUNT):
++ t = threading.Thread(target=a)
++ t.start()
++ t.join()
++ for i in range(WORKER_COUNT):
++ t = threading.Thread(target=b)
++ t.start()
++ _dummy.append(t)
++ t.join()
++ for i in range(WORKER_COUNT):
++ t = threading.Thread(target=a)
++ t.start()
++ t.join()
++ for i in range(WORKER_COUNT):
++ t = threading.Thread(target=c)
++ t.start()
++ t.join()
++ yappi.stop()
++ yappi.start()
++
++ def f():
++ pass
++
++ f()
++ stats = yappi.get_func_stats()
++ fsa = utils.find_stat_by_name(stats, 'a')
++ fsb = utils.find_stat_by_name(stats, 'b')
++ fsc = utils.find_stat_by_name(stats, 'c')
++ self.assertEqual(fsa.ncall, 10)
++ self.assertEqual(fsb.ncall, 5)
++ self.assertEqual(fsc.ncall, 5)
++ self.assertEqual(fsa.ttot, fsa.tsub, 30)
++ self.assertEqual(fsb.ttot, fsb.tsub, 10)
++ self.assertEqual(fsc.ttot, fsc.tsub, 5)
++
++ # MACOSx optimizes by only creating one worker thread
++ self.assertTrue(len(yappi.get_thread_stats()) >= 2)
++
++ def test_basic(self):
++ yappi.set_clock_type('wall')
++
++ def dummy():
++ pass
++
++ def a():
++ time.sleep(0.2)
++
++ class Worker1(threading.Thread):
++
++ def a(self):
++ time.sleep(0.3)
++
++ def run(self):
++ self.a()
++
++ yappi.start(builtins=False, profile_threads=True)
++
++ c = Worker1()
++ c.start()
++ c.join()
++ a()
++ stats = yappi.get_func_stats()
++ fsa1 = utils.find_stat_by_name(stats, 'Worker1.a')
++ fsa2 = utils.find_stat_by_name(stats, 'a')
++ self.assertTrue(fsa1 is not None)
++ self.assertTrue(fsa2 is not None)
++ self.assertTrue(fsa1.ttot > 0.2)
++ self.assertTrue(fsa2.ttot > 0.1)
++ tstats = yappi.get_thread_stats()
++ self.assertEqual(len(tstats), 2)
++ tsa = utils.find_stat_by_name(tstats, 'Worker1')
++ tsm = utils.find_stat_by_name(tstats, '_MainThread')
++ dummy() # call dummy to force ctx name to be retrieved again.
++ self.assertTrue(tsa is not None)
++ # TODO: I put dummy() to fix below, remove the comments after a while.
++ self.assertTrue( # FIX: I see this fails sometimes?
++ tsm is not None,
++ 'Could not find "_MainThread". Found: %s' % (', '.join(utils.get_stat_names(tstats))))
++
++ def test_ctx_stats(self):
++ from threading import Thread
++ DUMMY_WORKER_COUNT = 5
++ yappi.start()
++
++ class DummyThread(Thread):
++ pass
++
++ def dummy():
++ pass
++
++ def dummy_worker():
++ pass
++
++ for i in range(DUMMY_WORKER_COUNT):
++ t = DummyThread(target=dummy_worker)
++ t.start()
++ t.join()
++ yappi.stop()
++ stats = yappi.get_thread_stats()
++ tsa = utils.find_stat_by_name(stats, "DummyThread")
++ self.assertTrue(tsa is not None)
++ yappi.clear_stats()
++ time.sleep(1.0)
++ _timings = {
++ "a_1": 6,
++ "b_1": 5,
++ "c_1": 3,
++ "d_1": 1,
++ "a_2": 4,
++ "b_2": 3,
++ "c_2": 2,
++ "d_2": 1
++ }
++ _yappi._set_test_timings(_timings)
++
++ class Thread1(Thread):
++ pass
++
++ class Thread2(Thread):
++ pass
++
++ def a():
++ b()
++
++ def b():
++ c()
++
++ def c():
++ d()
++
++ def d():
++ time.sleep(0.6)
++
++ yappi.set_clock_type("wall")
++ yappi.start()
++ t1 = Thread1(target=a)
++ t1.start()
++ t2 = Thread2(target=a)
++ t2.start()
++ t1.join()
++ t2.join()
++ stats = yappi.get_thread_stats()
++
++ # the fist clear_stats clears the context table?
++ tsa = utils.find_stat_by_name(stats, "DummyThread")
++ self.assertTrue(tsa is None)
++
++ tst1 = utils.find_stat_by_name(stats, "Thread1")
++ tst2 = utils.find_stat_by_name(stats, "Thread2")
++ tsmain = utils.find_stat_by_name(stats, "_MainThread")
++ dummy() # call dummy to force ctx name to be retrieved again.
++ self.assertTrue(len(stats) == 3)
++ self.assertTrue(tst1 is not None)
++ self.assertTrue(tst2 is not None)
++ # TODO: I put dummy() to fix below, remove the comments after a while.
++ self.assertTrue( # FIX: I see this fails sometimes
++ tsmain is not None,
++ 'Could not find "_MainThread". Found: %s' % (', '.join(utils.get_stat_names(stats))))
++ self.assertTrue(1.0 > tst2.ttot >= 0.5)
++ self.assertTrue(1.0 > tst1.ttot >= 0.5)
++
++ # test sorting of the ctx stats
++ stats = stats.sort("totaltime", "desc")
++ prev_stat = None
++ for stat in stats:
++ if prev_stat:
++ self.assertTrue(prev_stat.ttot >= stat.ttot)
++ prev_stat = stat
++ stats = stats.sort("totaltime", "asc")
++ prev_stat = None
++ for stat in stats:
++ if prev_stat:
++ self.assertTrue(prev_stat.ttot <= stat.ttot)
++ prev_stat = stat
++ stats = stats.sort("schedcount", "desc")
++ prev_stat = None
++ for stat in stats:
++ if prev_stat:
++ self.assertTrue(prev_stat.sched_count >= stat.sched_count)
++ prev_stat = stat
++ stats = stats.sort("name", "desc")
++ prev_stat = None
++ for stat in stats:
++ if prev_stat:
++ self.assertTrue(prev_stat.name.lower() >= stat.name.lower())
++ prev_stat = stat
++ self.assertRaises(
++ yappi.YappiError, stats.sort, "invalid_thread_sorttype_arg"
++ )
++ self.assertRaises(
++ yappi.YappiError, stats.sort, "invalid_thread_sortorder_arg"
++ )
++
++ def test_ctx_stats_cpu(self):
++
++ def get_thread_name():
++ try:
++ return threading.current_thread().name
++ except AttributeError:
++ return "Anonymous"
++
++ def burn_cpu(sec):
++ t0 = yappi.get_clock_time()
++ elapsed = 0
++ while (elapsed < sec):
++ for _ in range(1000):
++ pass
++ elapsed = yappi.get_clock_time() - t0
++
++ def test():
++
++ ts = []
++ for i in (0.01, 0.05, 0.1):
++ t = threading.Thread(target=burn_cpu, args=(i, ))
++ t.name = "burn_cpu-%s" % str(i)
++ t.start()
++ ts.append(t)
++ for t in ts:
++ t.join()
++
++ yappi.set_clock_type("cpu")
++ yappi.set_context_name_callback(get_thread_name)
++
++ yappi.start()
++
++ test()
++
++ yappi.stop()
++
++ tstats = yappi.get_thread_stats()
++ r1 = '''
++ burn_cpu-0.1 3 123145356058624 0.100105 8
++ burn_cpu-0.05 2 123145361313792 0.050149 8
++ burn_cpu-0.01 1 123145356058624 0.010127 2
++ MainThread 0 4321620864 0.001632 6
++ '''
++ self.assert_ctx_stats_almost_equal(r1, tstats)
++
++ def test_producer_consumer_with_queues(self):
++ # we currently just stress yappi, no functionality test is done here.
++ yappi.start()
++ if utils.is_py3x():
++ from queue import Queue
++ else:
++ from Queue import Queue
++ from threading import Thread
++ WORKER_THREAD_COUNT = 50
++ WORK_ITEM_COUNT = 2000
++
++ def worker():
++ while True:
++ item = q.get()
++ # do the work with item
++ q.task_done()
++
++ q = Queue()
++ for i in range(WORKER_THREAD_COUNT):
++ t = Thread(target=worker)
++ t.daemon = True
++ t.start()
++
++ for item in range(WORK_ITEM_COUNT):
++ q.put(item)
++ q.join() # block until all tasks are done
++ #yappi.get_func_stats().sort("callcount").print_all()
++ yappi.stop()
++
++ def test_temporary_lock_waiting(self):
++ yappi.start()
++ _lock = threading.Lock()
++
++ def worker():
++ _lock.acquire()
++ try:
++ time.sleep(1.0)
++ finally:
++ _lock.release()
++
++ t1 = threading.Thread(target=worker)
++ t2 = threading.Thread(target=worker)
++ t1.start()
++ t2.start()
++ t1.join()
++ t2.join()
++ #yappi.get_func_stats().sort("callcount").print_all()
++ yappi.stop()
++
++ @unittest.skipIf(os.name != "posix", "requires Posix compliant OS")
++ def test_signals_with_blocking_calls(self):
++ import signal, os, time
++
++ # just to verify if signal is handled correctly and stats/yappi are not corrupted.
++ def handler(signum, frame):
++ raise Exception("Signal handler executed!")
++
++ yappi.start()
++ signal.signal(signal.SIGALRM, handler)
++ signal.alarm(1)
++ self.assertRaises(Exception, time.sleep, 2)
++ stats = yappi.get_func_stats()
++ fsh = utils.find_stat_by_name(stats, "handler")
++ self.assertTrue(fsh is not None)
++
++ @unittest.skipIf(not sys.version_info >= (3, 2), "requires Python 3.2")
++ def test_concurrent_futures(self):
++ yappi.start()
++ from concurrent.futures import ThreadPoolExecutor
++ with ThreadPoolExecutor(max_workers=5) as executor:
++ f = executor.submit(pow, 5, 2)
++ self.assertEqual(f.result(), 25)
++ time.sleep(1.0)
++ yappi.stop()
++
++ @unittest.skipIf(not sys.version_info >= (3, 2), "requires Python 3.2")
++ def test_barrier(self):
++ yappi.start()
++ b = threading.Barrier(2, timeout=1)
++
++ def worker():
++ try:
++ b.wait()
++ except threading.BrokenBarrierError:
++ pass
++ except Exception:
++ raise Exception("BrokenBarrierError not raised")
++
++ t1 = threading.Thread(target=worker)
++ t1.start()
++ #b.wait()
++ t1.join()
++ yappi.stop()
++
++
++class NonRecursiveFunctions(utils.YappiUnitTestCase):
++
++ def test_abcd(self):
++ _timings = {"a_1": 6, "b_1": 5, "c_1": 3, "d_1": 1}
++ _yappi._set_test_timings(_timings)
++
++ def a():
++ b()
++
++ def b():
++ c()
++
++ def c():
++ d()
++
++ def d():
++ pass
++
++ stats = utils.run_and_get_func_stats(a)
++ fsa = utils.find_stat_by_name(stats, 'a')
++ fsb = utils.find_stat_by_name(stats, 'b')
++ fsc = utils.find_stat_by_name(stats, 'c')
++ fsd = utils.find_stat_by_name(stats, 'd')
++ cfsab = fsa.children[fsb]
++ cfsbc = fsb.children[fsc]
++ cfscd = fsc.children[fsd]
++
++ self.assertEqual(fsa.ttot, 6)
++ self.assertEqual(fsa.tsub, 1)
++ self.assertEqual(fsb.ttot, 5)
++ self.assertEqual(fsb.tsub, 2)
++ self.assertEqual(fsc.ttot, 3)
++ self.assertEqual(fsc.tsub, 2)
++ self.assertEqual(fsd.ttot, 1)
++ self.assertEqual(fsd.tsub, 1)
++ self.assertEqual(cfsab.ttot, 5)
++ self.assertEqual(cfsab.tsub, 2)
++ self.assertEqual(cfsbc.ttot, 3)
++ self.assertEqual(cfsbc.tsub, 2)
++ self.assertEqual(cfscd.ttot, 1)
++ self.assertEqual(cfscd.tsub, 1)
++
++ def test_stop_in_middle(self):
++ _timings = {"a_1": 6, "b_1": 4}
++ _yappi._set_test_timings(_timings)
++
++ def a():
++ b()
++ yappi.stop()
++
++ def b():
++ time.sleep(0.2)
++
++ yappi.start()
++ a()
++ stats = yappi.get_func_stats()
++ fsa = utils.find_stat_by_name(stats, 'a')
++ fsb = utils.find_stat_by_name(stats, 'b')
++
++ self.assertEqual(fsa.ncall, 1)
++ self.assertEqual(fsa.nactualcall, 0)
++ self.assertEqual(fsa.ttot, 0) # no call_leave called
++ self.assertEqual(fsa.tsub, 0) # no call_leave called
++ self.assertEqual(fsb.ttot, 4)
++
++
++class RecursiveFunctions(utils.YappiUnitTestCase):
++
++ def test_fibonacci(self):
++
++ def fib(n):
++ if n > 1:
++ return fib(n - 1) + fib(n - 2)
++ else:
++ return n
++
++ stats = utils.run_and_get_func_stats(fib, 22)
++ fs = utils.find_stat_by_name(stats, 'fib')
++ self.assertEqual(fs.ncall, 57313)
++ self.assertEqual(fs.ttot, fs.tsub)
++
++ def test_abcadc(self):
++ _timings = {
++ "a_1": 20,
++ "b_1": 19,
++ "c_1": 17,
++ "a_2": 13,
++ "d_1": 12,
++ "c_2": 10,
++ "a_3": 5
++ }
++ _yappi._set_test_timings(_timings)
++
++ def a(n):
++ if n == 3:
++ return
++ if n == 1 + 1:
++ d(n)
++ else:
++ b(n)
++
++ def b(n):
++ c(n)
++
++ def c(n):
++ a(n + 1)
++
++ def d(n):
++ c(n)
++
++ stats = utils.run_and_get_func_stats(a, 1)
++ fsa = utils.find_stat_by_name(stats, 'a')
++ fsb = utils.find_stat_by_name(stats, 'b')
++ fsc = utils.find_stat_by_name(stats, 'c')
++ fsd = utils.find_stat_by_name(stats, 'd')
++ self.assertEqual(fsa.ncall, 3)
++ self.assertEqual(fsa.nactualcall, 1)
++ self.assertEqual(fsa.ttot, 20)
++ self.assertEqual(fsa.tsub, 7)
++ self.assertEqual(fsb.ttot, 19)
++ self.assertEqual(fsb.tsub, 2)
++ self.assertEqual(fsc.ttot, 17)
++ self.assertEqual(fsc.tsub, 9)
++ self.assertEqual(fsd.ttot, 12)
++ self.assertEqual(fsd.tsub, 2)
++ cfsca = fsc.children[fsa]
++ self.assertEqual(cfsca.nactualcall, 0)
++ self.assertEqual(cfsca.ncall, 2)
++ self.assertEqual(cfsca.ttot, 13)
++ self.assertEqual(cfsca.tsub, 6)
++
++ def test_aaaa(self):
++ _timings = {"d_1": 9, "d_2": 7, "d_3": 3, "d_4": 2}
++ _yappi._set_test_timings(_timings)
++
++ def d(n):
++ if n == 3:
++ return
++ d(n + 1)
++
++ stats = utils.run_and_get_func_stats(d, 0)
++ fsd = utils.find_stat_by_name(stats, 'd')
++ self.assertEqual(fsd.ncall, 4)
++ self.assertEqual(fsd.nactualcall, 1)
++ self.assertEqual(fsd.ttot, 9)
++ self.assertEqual(fsd.tsub, 9)
++ cfsdd = fsd.children[fsd]
++ self.assertEqual(cfsdd.ttot, 7)
++ self.assertEqual(cfsdd.tsub, 7)
++ self.assertEqual(cfsdd.ncall, 3)
++ self.assertEqual(cfsdd.nactualcall, 0)
++
++ def test_abcabc(self):
++ _timings = {
++ "a_1": 20,
++ "b_1": 19,
++ "c_1": 17,
++ "a_2": 13,
++ "b_2": 11,
++ "c_2": 9,
++ "a_3": 6
++ }
++ _yappi._set_test_timings(_timings)
++
++ def a(n):
++ if n == 3:
++ return
++ else:
++ b(n)
++
++ def b(n):
++ c(n)
++
++ def c(n):
++ a(n + 1)
++
++ stats = utils.run_and_get_func_stats(a, 1)
++ fsa = utils.find_stat_by_name(stats, 'a')
++ fsb = utils.find_stat_by_name(stats, 'b')
++ fsc = utils.find_stat_by_name(stats, 'c')
++ self.assertEqual(fsa.ncall, 3)
++ self.assertEqual(fsa.nactualcall, 1)
++ self.assertEqual(fsa.ttot, 20)
++ self.assertEqual(fsa.tsub, 9)
++ self.assertEqual(fsb.ttot, 19)
++ self.assertEqual(fsb.tsub, 4)
++ self.assertEqual(fsc.ttot, 17)
++ self.assertEqual(fsc.tsub, 7)
++ cfsab = fsa.children[fsb]
++ cfsbc = fsb.children[fsc]
++ cfsca = fsc.children[fsa]
++ self.assertEqual(cfsab.ttot, 19)
++ self.assertEqual(cfsab.tsub, 4)
++ self.assertEqual(cfsbc.ttot, 17)
++ self.assertEqual(cfsbc.tsub, 7)
++ self.assertEqual(cfsca.ttot, 13)
++ self.assertEqual(cfsca.tsub, 8)
++
++ def test_abcbca(self):
++ _timings = {"a_1": 10, "b_1": 9, "c_1": 7, "b_2": 4, "c_2": 2, "a_2": 1}
++ _yappi._set_test_timings(_timings)
++ self._ncall = 1
++
++ def a():
++ if self._ncall == 1:
++ b()
++ else:
++ return
++
++ def b():
++ c()
++
++ def c():
++ if self._ncall == 1:
++ self._ncall += 1
++ b()
++ else:
++ a()
++
++ stats = utils.run_and_get_func_stats(a)
++ fsa = utils.find_stat_by_name(stats, 'a')
++ fsb = utils.find_stat_by_name(stats, 'b')
++ fsc = utils.find_stat_by_name(stats, 'c')
++ cfsab = fsa.children[fsb]
++ cfsbc = fsb.children[fsc]
++ cfsca = fsc.children[fsa]
++ self.assertEqual(fsa.ttot, 10)
++ self.assertEqual(fsa.tsub, 2)
++ self.assertEqual(fsb.ttot, 9)
++ self.assertEqual(fsb.tsub, 4)
++ self.assertEqual(fsc.ttot, 7)
++ self.assertEqual(fsc.tsub, 4)
++ self.assertEqual(cfsab.ttot, 9)
++ self.assertEqual(cfsab.tsub, 2)
++ self.assertEqual(cfsbc.ttot, 7)
++ self.assertEqual(cfsbc.tsub, 4)
++ self.assertEqual(cfsca.ttot, 1)
++ self.assertEqual(cfsca.tsub, 1)
++ self.assertEqual(cfsca.ncall, 1)
++ self.assertEqual(cfsca.nactualcall, 0)
++
++ def test_aabccb(self):
++ _timings = {
++ "a_1": 13,
++ "a_2": 11,
++ "b_1": 9,
++ "c_1": 5,
++ "c_2": 3,
++ "b_2": 1
++ }
++ _yappi._set_test_timings(_timings)
++ self._ncall = 1
++
++ def a():
++ if self._ncall == 1:
++ self._ncall += 1
++ a()
++ else:
++ b()
++
++ def b():
++ if self._ncall == 3:
++ return
++ else:
++ c()
++
++ def c():
++ if self._ncall == 2:
++ self._ncall += 1
++ c()
++ else:
++ b()
++
++ stats = utils.run_and_get_func_stats(a)
++ fsa = utils.find_stat_by_name(stats, 'a')
++ fsb = utils.find_stat_by_name(stats, 'b')
++ fsc = utils.find_stat_by_name(stats, 'c')
++ cfsaa = fsa.children[fsa.index]
++ cfsab = fsa.children[fsb]
++ cfsbc = fsb.children[fsc.full_name]
++ cfscc = fsc.children[fsc]
++ cfscb = fsc.children[fsb]
++ self.assertEqual(fsb.ttot, 9)
++ self.assertEqual(fsb.tsub, 5)
++ self.assertEqual(cfsbc.ttot, 5)
++ self.assertEqual(cfsbc.tsub, 2)
++ self.assertEqual(fsa.ttot, 13)
++ self.assertEqual(fsa.tsub, 4)
++ self.assertEqual(cfsab.ttot, 9)
++ self.assertEqual(cfsab.tsub, 4)
++ self.assertEqual(cfsaa.ttot, 11)
++ self.assertEqual(cfsaa.tsub, 2)
++ self.assertEqual(fsc.ttot, 5)
++ self.assertEqual(fsc.tsub, 4)
++
++ def test_abaa(self):
++ _timings = {"a_1": 13, "b_1": 10, "a_2": 9, "a_3": 5}
++ _yappi._set_test_timings(_timings)
++
++ self._ncall = 1
++
++ def a():
++ if self._ncall == 1:
++ b()
++ elif self._ncall == 2:
++ self._ncall += 1
++ a()
++ else:
++ return
++
++ def b():
++ self._ncall += 1
++ a()
++
++ stats = utils.run_and_get_func_stats(a)
++ fsa = utils.find_stat_by_name(stats, 'a')
++ fsb = utils.find_stat_by_name(stats, 'b')
++ cfsaa = fsa.children[fsa]
++ cfsba = fsb.children[fsa]
++ self.assertEqual(fsb.ttot, 10)
++ self.assertEqual(fsb.tsub, 1)
++ self.assertEqual(fsa.ttot, 13)
++ self.assertEqual(fsa.tsub, 12)
++ self.assertEqual(cfsaa.ttot, 5)
++ self.assertEqual(cfsaa.tsub, 5)
++ self.assertEqual(cfsba.ttot, 9)
++ self.assertEqual(cfsba.tsub, 4)
++
++ def test_aabb(self):
++ _timings = {"a_1": 13, "a_2": 10, "b_1": 9, "b_2": 5}
++ _yappi._set_test_timings(_timings)
++
++ self._ncall = 1
++
++ def a():
++ if self._ncall == 1:
++ self._ncall += 1
++ a()
++ elif self._ncall == 2:
++ b()
++ else:
++ return
++
++ def b():
++ if self._ncall == 2:
++ self._ncall += 1
++ b()
++ else:
++ return
++
++ stats = utils.run_and_get_func_stats(a)
++ fsa = utils.find_stat_by_name(stats, 'a')
++ fsb = utils.find_stat_by_name(stats, 'b')
++ cfsaa = fsa.children[fsa]
++ cfsab = fsa.children[fsb]
++ cfsbb = fsb.children[fsb]
++ self.assertEqual(fsa.ttot, 13)
++ self.assertEqual(fsa.tsub, 4)
++ self.assertEqual(fsb.ttot, 9)
++ self.assertEqual(fsb.tsub, 9)
++ self.assertEqual(cfsaa.ttot, 10)
++ self.assertEqual(cfsaa.tsub, 1)
++ self.assertEqual(cfsab.ttot, 9)
++ self.assertEqual(cfsab.tsub, 4)
++ self.assertEqual(cfsbb.ttot, 5)
++ self.assertEqual(cfsbb.tsub, 5)
++
++ def test_abbb(self):
++ _timings = {"a_1": 13, "b_1": 10, "b_2": 6, "b_3": 1}
++ _yappi._set_test_timings(_timings)
++
++ self._ncall = 1
++
++ def a():
++ if self._ncall == 1:
++ b()
++
++ def b():
++ if self._ncall == 3:
++ return
++ self._ncall += 1
++ b()
++
++ stats = utils.run_and_get_func_stats(a)
++ fsa = utils.find_stat_by_name(stats, 'a')
++ fsb = utils.find_stat_by_name(stats, 'b')
++ cfsab = fsa.children[fsb]
++ cfsbb = fsb.children[fsb]
++ self.assertEqual(fsa.ttot, 13)
++ self.assertEqual(fsa.tsub, 3)
++ self.assertEqual(fsb.ttot, 10)
++ self.assertEqual(fsb.tsub, 10)
++ self.assertEqual(fsb.ncall, 3)
++ self.assertEqual(fsb.nactualcall, 1)
++ self.assertEqual(cfsab.ttot, 10)
++ self.assertEqual(cfsab.tsub, 4)
++ self.assertEqual(cfsbb.ttot, 6)
++ self.assertEqual(cfsbb.tsub, 6)
++ self.assertEqual(cfsbb.nactualcall, 0)
++ self.assertEqual(cfsbb.ncall, 2)
++
++ def test_aaab(self):
++ _timings = {"a_1": 13, "a_2": 10, "a_3": 6, "b_1": 1}
++ _yappi._set_test_timings(_timings)
++
++ self._ncall = 1
++
++ def a():
++ if self._ncall == 3:
++ b()
++ return
++ self._ncall += 1
++ a()
++
++ def b():
++ return
++
++ stats = utils.run_and_get_func_stats(a)
++ fsa = utils.find_stat_by_name(stats, 'a')
++ fsb = utils.find_stat_by_name(stats, 'b')
++ cfsaa = fsa.children[fsa]
++ cfsab = fsa.children[fsb]
++ self.assertEqual(fsa.ttot, 13)
++ self.assertEqual(fsa.tsub, 12)
++ self.assertEqual(fsb.ttot, 1)
++ self.assertEqual(fsb.tsub, 1)
++ self.assertEqual(cfsaa.ttot, 10)
++ self.assertEqual(cfsaa.tsub, 9)
++ self.assertEqual(cfsab.ttot, 1)
++ self.assertEqual(cfsab.tsub, 1)
++
++ def test_abab(self):
++ _timings = {"a_1": 13, "b_1": 10, "a_2": 6, "b_2": 1}
++ _yappi._set_test_timings(_timings)
++
++ self._ncall = 1
++
++ def a():
++ b()
++
++ def b():
++ if self._ncall == 2:
++ return
++ self._ncall += 1
++ a()
++
++ stats = utils.run_and_get_func_stats(a)
++ fsa = utils.find_stat_by_name(stats, 'a')
++ fsb = utils.find_stat_by_name(stats, 'b')
++ cfsab = fsa.children[fsb]
++ cfsba = fsb.children[fsa]
++ self.assertEqual(fsa.ttot, 13)
++ self.assertEqual(fsa.tsub, 8)
++ self.assertEqual(fsb.ttot, 10)
++ self.assertEqual(fsb.tsub, 5)
++ self.assertEqual(cfsab.ttot, 10)
++ self.assertEqual(cfsab.tsub, 5)
++ self.assertEqual(cfsab.ncall, 2)
++ self.assertEqual(cfsab.nactualcall, 1)
++ self.assertEqual(cfsba.ttot, 6)
++ self.assertEqual(cfsba.tsub, 5)
++
++
++if __name__ == '__main__':
++ # import sys;sys.argv = ['', 'BasicUsage.test_run_as_script']
++ # import sys;sys.argv = ['', 'MultithreadedScenarios.test_subsequent_profile']
++ unittest.main()
--- a/tests/test_gevent.py
+++ b/tests/test_gevent.py
@@ -4,7 +4,7 @@ import yappi
@@ -65,8 +3881,6 @@ index 8569712..fe15b29 100644
YappiUnitTestCase, find_stat_by_name, burn_cpu, burn_io,
burn_io_gevent
)
-diff --git a/tests/test_hooks.py b/tests/test_hooks.py
-index a96a4f1..e4177ba 100644
--- a/tests/test_hooks.py
+++ b/tests/test_hooks.py
@@ -5,7 +5,7 @@ import unittest
@@ -78,8 +3892,6 @@ index a96a4f1..e4177ba 100644
def a():
-diff --git a/tests/test_tags.py b/tests/test_tags.py
-index b0b531d..1928888 100644
--- a/tests/test_tags.py
+++ b/tests/test_tags.py
@@ -2,7 +2,7 @@ import unittest
@@ -91,6 +3903,3 @@ index b0b531d..1928888 100644
class MultiThreadTests(YappiUnitTestCase):
---
-2.31.1
-