Source code for pymemtrace.debug_malloc_stats

"""
This is a wrapper around sys._debugmallocstats which writes to C stderr.
We capture this and can diff two calls to sys._debugmallocatats.

sys._debugmallocatats Implementation (Python 3.9)
=================================================

``sys__debugmallocstats_impl(PyObject *module)``

Calls:

``_PyObject_DebugMallocStats(stderr))``

Then:

``_PyObject_DebugTypeStats(stderr);``

Memory Usage by the Python Object Allocator
---------------------------------------------

``_PyObject_DebugMallocStats is defined in Objects/obmalloc.c:``

Which dumps out the arenas/pools/blocks.

Memory Usage by Type
------------------------------

``_PyObject_DebugTypeStats`` is defined in ``Objects/object.c``

This calls::

    _PyDict_DebugMallocStats(out); // in Objects/dictobject.c, just calls _PyDebugAllocatorStats in Objects/obmalloc.c
    _PyFloat_DebugMallocStats(out); // in Objects/dictobject.c, just calls _PyDebugAllocatorStats in Objects/obmalloc.c
    _PyFrame_DebugMallocStats(out); // etc.
    _PyList_DebugMallocStats(out);
    _PyTuple_DebugMallocStats(out); // in Objects/tupleobject.c, calls _PyDebugAllocatorStats in Objects/obmalloc.c for (i = 1; i < PyTuple_MAXSAVESIZE; i++)

Note that only dict, float, frame, list, tuple are reported.
"""
import io
import re
import sys
import typing

from pymemtrace import redirect_stdout


[docs]def get_debugmallocstats() -> bytes: """Invokes sys._debugmallocstats and captures the output as bytes.""" stream = io.BytesIO() with redirect_stdout.stderr_redirector(stream): sys._debugmallocstats() return stream.getvalue()
# return stream.getvalue().decode('ascii', 'replace') #: In ``Object/obmalloc.c``: #: #: .. code-block:: text #: #: #define POOL_SIZE SYSTEM_PAGE_SIZE /* must be 2^N */ #: POOL_SIZE = 4096 #: This value is initially approximate. #: In ``Object/obmalloc.c``: #: #: .. code-block:: c #: #: #define POOL_OVERHEAD _Py_SIZE_ROUND_UP(sizeof(struct pool_header), ALIGNMENT) #: #: We can calculate this from the sum of num_pools divided into ``'# bytes lost to pool headers'``. #: This is done whenever a :py:class:`SysDebugMallocStats` is created. POOL_OVERHEAD = 48
[docs]class DebugMallocStat(typing.NamedTuple): """Represents a single line in the malloc stats section. For example: .. code-block:: text class size num pools blocks in use avail blocks ----- ---- --------- ------------- ------------ 0 16 2 297 209 Nomenclature is from ``_PyObject_DebugMallocStats(stderr))`` in ``Objects/obmalloc.c``. Typical implementation: .. code-block:: c for (i = 0; i < numclasses; ++i) { size_t p = numpools[i]; size_t b = numblocks[i]; size_t f = numfreeblocks[i]; uint size = INDEX2SIZE(i); if (p == 0) { assert(b == 0 && f == 0); continue; } fprintf(out, "%5u %6u " "%11" PY_FORMAT_SIZE_T "u " "%15" PY_FORMAT_SIZE_T "u " "%13" PY_FORMAT_SIZE_T "u\\n", i, size, p, b, f); allocated_bytes += b * size; available_bytes += f * size; pool_header_bytes += p * POOL_OVERHEAD; quantization += p * ((POOL_SIZE - POOL_OVERHEAD) % size); } fputc('\\n', out); """ block_class: int size: int num_pools: int blocks_in_use: int avail_blocks: int # @property # def bytes_in_use(self)-> int: # return self.size * self.num_pools * self.blocks_in_use @property def allocated_bytes(self) -> int: return self.blocks_in_use * self.size @property def available_bytes(self) -> int: return self.avail_blocks * self.size @property def pool_header_bytes(self) -> int: return self.num_pools * POOL_OVERHEAD @property def quantization(self) -> int: return self.num_pools * ((POOL_SIZE - POOL_OVERHEAD) % self.size)
[docs] def __repr__(self): """Representation of self of the form: .. code-block:: text 0 16 4 777 235 """ return f'{self.block_class:5d} {self.size:5d} {self.num_pools:10d} {self.blocks_in_use:14d} {self.avail_blocks:12d}'
[docs]def diff_debug_malloc_stat(a: DebugMallocStat, b: DebugMallocStat) -> str: """Takes two DebugMallocStat objects and returns a string with the difference. The string is of similar format to the input from ``sys._debugmallocstats``.""" if a.block_class != b.block_class: raise ValueError(f'a.block_class != b.block_class: {a.block_class} != {b.block_class}') if a.size != b.size: raise ValueError(f'a.size != b.size: {a.size} != {b.size}') return ( f'{a.block_class:5d}' f' {a.size:5d}' f' {b.num_pools - a.num_pools:+10d}' f' {b.blocks_in_use - a.blocks_in_use:+14d}' f' {b.avail_blocks - a.avail_blocks:+12d}')
#: Matches:: #: #: b' 0 16 2 294 212' #: #: Decomposed to extract the five integers. RE_DEBUG_MALLOC_STATS_LINE = re.compile(rb'^\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)$')
[docs]def last_value_as_int(line: bytes) -> int: """Returns that last value of the line such as: .. code-block:: text b'# arenas allocated total = 2,033' """ return int(line.split(b'=')[1].replace(b',', b''))
#: Matches:: #: #: b'18 arenas * 262144 bytes/arena = 4,718,592' #: #: Decomposed to extract the two integers, the total is computed. RE_DEBUG_MALLOC_ARENAS_SUMMARY_LINE = re.compile(rb'^(\d+) arenas \* (\d+) bytes/arena\s+=\s+(.+)$')
[docs]class DebugMallocArenas: """Decomposes this:: # arenas allocated total = 2,033 # arenas reclaimed = 2,015 # arenas highwater mark = 18 # arenas allocated current = 18 18 arenas * 262144 bytes/arena = 4,718,592 Into values for: ntimes_arena_allocated, narenas, narenas_highwater, ARENA_SIZE. Infers: arenas_reclaimed, arenas_total. From Object/obmalloc.c: .. code-block:: c (void)printone(out, "# arenas allocated total", ntimes_arena_allocated); (void)printone(out, "# arenas reclaimed", ntimes_arena_allocated - narenas); (void)printone(out, "# arenas highwater mark", narenas_highwater); (void)printone(out, "# arenas allocated current", narenas); /* Total number of times malloc() called to allocate an arena. */ static size_t ntimes_arena_allocated = 0; // b'# arenas allocated total = 2,033' /* High water mark (max value ever seen) for narenas_currently_allocated. */ static size_t narenas_highwater = 0; // b'# arenas highwater mark = 18' /* # of arenas actually allocated. */ size_t narenas = 0; // b'# arenas allocated current = 18' PyOS_snprintf(buf, sizeof(buf), "%" PY_FORMAT_SIZE_T "u arenas * %d bytes/arena", narenas, ARENA_SIZE); (void)printone(out, buf, narenas * ARENA_SIZE); // b'18 arenas * 262144 bytes/arena = 4,718,592' // Simple calculation: 18 * 262144 = 4718592 """
[docs] def __init__(self, debug_malloc: bytes): """Constructor, decomposes this: .. code-block:: text # arenas allocated total -> self.ntimes_arena_allocated # arenas reclaimed -> self.arenas_reclaimed # arenas highwater mark -> self.narenas_highwater # arenas allocated current -> self.narenas 18 arenas * 262144 bytes/arena -> self.narenas * self.arena_size = self.arenas_total """ for line in debug_malloc.splitlines(keepends=False): if line.startswith(b'# arenas allocated total'): # (void)printone(out, "# arenas allocated total", ntimes_arena_allocated); self.ntimes_arena_allocated = last_value_as_int(line) elif line.startswith(b'# arenas reclaimed'): # (void)printone(out, "# arenas reclaimed", ntimes_arena_allocated - narenas); pass elif line.startswith(b'# arenas highwater mark'): # (void)printone(out, "# arenas highwater mark", narenas_highwater); self.narenas_highwater = last_value_as_int(line) elif line.startswith(b'# arenas allocated current'): # (void)printone(out, "# arenas allocated current", narenas); self.narenas = last_value_as_int(line) else: m = RE_DEBUG_MALLOC_ARENAS_SUMMARY_LINE.match(line) if m: # PyOS_snprintf(buf, sizeof(buf), "%" PY_FORMAT_SIZE_T "u arenas * %d bytes/arena", narenas, ARENA_SIZE); # (void)printone(out, buf, narenas * ARENA_SIZE); # # b'18 arenas * 262144 bytes/arena = 4,718,592' # Simple calculation: 18 * 262144 = 4718592 self.arena_size = int(m.group(2)) expected_attrs = ('ntimes_arena_allocated', 'narenas_highwater', 'narenas', 'arena_size') if not all(hasattr(self, name) for name in expected_attrs): raise ValueError(f'Can not find required attributes {expected_attrs}')
@property def arenas_reclaimed(self) -> int: return self.ntimes_arena_allocated - self.narenas @property def arenas_total(self) -> int: return self.narenas * self.arena_size
[docs] def __repr__(self): """Returns a string similar to sys._debugmallocstats.""" ret= [ f'# arenas allocated total = {self.ntimes_arena_allocated:>20,d}', f'# arenas reclaimed = {self.arenas_reclaimed:>20,d}', f'# arenas highwater mark = {self.narenas_highwater:>20,d}', f'# arenas allocated current = {self.narenas:>20,d}', ] lhs = f'{self.narenas} arenas * {self.arena_size} bytes/arena' ret.append(f'{lhs:<34s} = {self.arenas_total:>20,d}') return '\n'.join(ret)
#: Matches:: #: #: b'63 unused pools * 4096 bytes = 258,048' #: #: Decomposed to extract the two integers, the total is computed. RE_DEBUG_MALLOC_POOLS_SUMMARY_LINE = re.compile(rb'^(\d+) unused pools \* (\d+) bytes\s+=\s+(.+)$')
[docs]class DebugMallocPoolsBlocks: """Decomposes this:: # bytes in allocated blocks = 4,280,848 # bytes in available blocks = 70,368 63 unused pools * 4096 bytes = 258,048 # bytes lost to pool headers = 52,272 # bytes lost to quantization = 57,056 # bytes lost to arena alignment = 0 Total = 4,718,592 From Object/obmalloc.c:: total = printone(out, "# bytes in allocated blocks", allocated_bytes); total += printone(out, "# bytes in available blocks", available_bytes); PyOS_snprintf(buf, sizeof(buf), "%u unused pools * %d bytes", numfreepools, POOL_SIZE); total += printone(out, buf, (size_t)numfreepools * POOL_SIZE); total += printone(out, "# bytes lost to pool headers", pool_header_bytes); total += printone(out, "# bytes lost to quantization", quantization); total += printone(out, "# bytes lost to arena alignment", arena_alignment); (void)printone(out, "Total", total); Extracts: allocated_bytes, available_bytes, numfreepools, POOL_SIZE, pool_header_bytes, quantization, arena_alignment. Infers: unused_pool_total, TOTAL. """
[docs] def __init__(self, debug_malloc: bytes): for line in debug_malloc.splitlines(keepends=False): if line.startswith(b'# bytes in allocated blocks'): # total = printone(out, "# bytes in allocated blocks", allocated_bytes); self.allocated_bytes = last_value_as_int(line) elif line.startswith(b'# bytes in available blocks'): # total += printone(out, "# bytes in available blocks", available_bytes); self.available_bytes = last_value_as_int(line) elif line.startswith(b'# bytes lost to pool headers'): # total += printone(out, "# bytes lost to pool headers", pool_header_bytes); self.pool_header_bytes = last_value_as_int(line) elif line.startswith(b'# bytes lost to quantization'): # total += printone(out, "# bytes lost to quantization", quantization); self.quantization = last_value_as_int(line) elif line.startswith(b'# bytes lost to arena alignment'): # total += printone(out, "# bytes lost to arena alignment", arena_alignment); self.arena_alignment = last_value_as_int(line) else: m = RE_DEBUG_MALLOC_POOLS_SUMMARY_LINE.match(line) if m: # PyOS_snprintf(buf, sizeof(buf), "%u unused pools * %d bytes", numfreepools, POOL_SIZE); # total += printone(out, buf, (size_t)numfreepools * POOL_SIZE); # # 63 unused pools * 4096 bytes = 258,048 # Simple calculation: 63 * 4096 = 258048 self.numfreepools = int(m.group(1)) self.pool_size = int(m.group(2)) expected_attrs = ( 'allocated_bytes', 'available_bytes', 'pool_header_bytes', 'quantization', 'arena_alignment', 'numfreepools', 'pool_size' ) if not all(hasattr(self, name) for name in expected_attrs): raise ValueError(f'Can not find required attributes {expected_attrs}')
@property def unused_pool_total(self) -> int: return self.numfreepools * self.pool_size @property def total(self) -> int: return self.allocated_bytes + self.available_bytes + self.unused_pool_total + self.pool_header_bytes \ + self.quantization + self.arena_alignment
[docs] def pool_overhead(self, num_pools: int) -> int: """Returns the POOL_OVERHEAD as self.pool_header_bytes // the number of pools. self.pool_header_bytes comes from:: # bytes lost to pool headers = 51,264 Number of pools comes from the sum of ``num pools`` from DebugMallocStat.num_pools """ return self.pool_header_bytes // num_pools
[docs] def __repr__(self): """Returns a string similar to sys._debugmallocstats.""" ret = [ f'# bytes in allocated blocks = {self.allocated_bytes:>20,d}', f'# bytes in available blocks = {self.available_bytes:>20,d}', ] lhs = f'{self.numfreepools} unused pools * {self.pool_size} bytes' ret.append(f'{lhs:<34s} = {self.unused_pool_total:>20,d}') ret.extend([ f'# bytes lost to pool headers = {self.pool_header_bytes:>20,d}', f'# bytes lost to quantization = {self.quantization:>20,d}', f'# bytes lost to arena alignment = {self.arena_alignment:>20,d}', f'Total = {self.total:>20,d}', ]) return '\n'.join(ret)
[docs]class DebugTypeStat(typing.NamedTuple): """Represents a single line from ``sys._debugmallocstats``. Decomposed from a line such as: .. code-block:: text 4 free PyCFunctionObjects * 56 bytes each = 224 See ``_PyObject_DebugTypeStats(stderr);`` in ``Objects/obmalloc.c`` """ free_count: int object_type: str bytes_each: int bytes_total: int
[docs] def __repr__(self): """Returns a string of the form of these lines: .. code-block:: text 4 free PyCFunctionObjects * 56 bytes each = 224 9 free PyDictObjects * 48 bytes each = 432 5 free PyFloatObjects * 24 bytes each = 120 0 free PyFrameObjects * 368 bytes each = 0 80 free PyListObjects * 40 bytes each = 3,200 8 free PyMethodObjects * 48 bytes each = 384 7 free 1-sized PyTupleObjects * 32 bytes each = 224 52 free 2-sized PyTupleObjects * 40 bytes each = 2,080 1 free 3-sized PyTupleObjects * 48 bytes each = 48 0 free 10-sized PyTupleObjects * 104 bytes each = 0 """ lhs = f'{self.free_count} free {self.object_type} * {self.bytes_each} bytes each' return f'{lhs:>48s} = {self.bytes_total:>20,d}'
#: Matches:: #: #: b' 34 free 2-sized PyTupleObjects * 40 bytes each = 1,360' #: #: NOTE: commas in last value caused by printone() in Object/obmalloc.c #: printone is used in many places where there is message = value such as memory pool totals and type memory information. RE_DEBUG_MALLOC_TYPE_LINE = re.compile(rb'^\s*(\d+) free (.+?) \* (\d+) bytes each =\s+(.+)$')
[docs]def diff_debug_type_stat(a: DebugTypeStat, b: DebugTypeStat) -> str: """Takes two DebugMallocStat objects and returns a string with the difference. The string is of similar format to the input from ``sys._debugmallocstats``.""" if a.object_type != b.object_type: raise ValueError(f'a.object_type != b.object_type: {a.object_type} != {b.object_type}') if a.bytes_each != b.bytes_each: raise ValueError(f'a.bytes_each != b.bytes_each: {a.bytes_each} != {b.bytes_each}') lhs = f'{b.free_count - a.free_count:+d} free {a.object_type} * {a.bytes_each} bytes each' return f'{lhs:>48s} = {b.bytes_total - a.bytes_total:>+20,d}'
#: Matches:: #: #: b'Small block threshold = 512, in 32 size classes.' #: #: Decomposed to extract the two integers. RE_DEBUG_MALLOC_HEADER_LINE = re.compile(rb'^Small block threshold = (\d+), in (\d+) size classes\.$')
[docs]class SysDebugMallocStats: """This decomposes the output of ``sys._debugmallocstats`` into these areas: - A list of malloc stats showing the pools and blocks. - Descriptions of arenas. - Descriptions of pools and blocks. - A list of malloc usage by (some) types. This class takes a snapshot of the debug malloc stats from ``sys._debugmallocstats``. Importantly it can identify the difference between two snapshots. """
[docs] def __init__(self, debug_malloc: bytes = b''): """Constructor, this optionally takes a bytes object for testing. If nothing supplied this gets the bytes object from sys._debugmallocstats. """ self.malloc_stats: typing.List[DebugMallocStat] = [] self.type_stats: typing.List[DebugTypeStat] = [] # Used for lookup by type self.type_map: typing.Dict[bytes, int] = {} if not debug_malloc: debug_malloc: bytes = get_debugmallocstats() for line in debug_malloc.splitlines(keepends=False): m = RE_DEBUG_MALLOC_STATS_LINE.match(line) if m: self.malloc_stats.append(DebugMallocStat(*[int(v) for v in m.groups()])) else: m = RE_DEBUG_MALLOC_TYPE_LINE.match(line) if m: self.type_map[m.group(2)] = len(self.type_stats) self.type_stats.append(DebugTypeStat( int(m.group(1)), m.group(2).decode('ascii'), int(m.group(3)), int(m.group(4).replace(b',', b'')) )) else: m = RE_DEBUG_MALLOC_HEADER_LINE.match(line) if m: self.small_block_threshold = int(m.group(1)) self.size_classes = int(m.group(2)) self.arenas = DebugMallocArenas(debug_malloc) self.pools_blocks = DebugMallocPoolsBlocks(debug_malloc) expected_attrs = ('small_block_threshold', 'size_classes') if not all(hasattr(self, name) for name in expected_attrs): raise ValueError(f'Can not find required attributes {expected_attrs}') # Set the global POOL_OVERHEAD by combining malloc_stats and pools_blocks num_pools = sum(v.num_pools for v in self.malloc_stats) pool_overhead = self.pools_blocks.pool_overhead(num_pools) global POOL_OVERHEAD POOL_OVERHEAD = pool_overhead
[docs] def object_types(self) -> typing.KeysView[bytes]: """Return all the known object types.""" return self.type_map.keys()
[docs] def has_object_type(self, object_type: bytes): """Return True if the object type is present.""" return object_type in self.type_map
[docs] def type_stat(self, object_type: bytes) -> DebugTypeStat: """Return the DebugTypeStat for the named object type. May raise an KeyError if the object_type doe not exist.""" return self.type_stats[self.type_map[object_type]]
[docs] def __repr__(self): """Representation of self similar to the output of sys._debugmallocstats""" results = [ f'Small block threshold = {self.small_block_threshold}, in {self.size_classes} size classes.', '', 'class size num pools blocks in use avail blocks', '----- ---- --------- ------------- ------------', ] for stat in self.malloc_stats: results.append(repr(stat)) results.append('') results.append(repr(self.arenas)) results.append('') results.append(repr(self.pools_blocks)) results.append('') for stat in self.type_stats: results.append(repr(stat)) # Terminate with a '\n' results.append('') return '\n'.join(results)
# def debugmallocstats_to_malloc_blocks() -> typing.List[DebugMallocStats]: # debug_malloc: bytes = get_debugmallocstats() # ret = [] # for line in debug_malloc.splitlines(keepends=False): # m = RE_DEBUG_MALLOC_STATS_LINE.match(line) # if m: # ret.append(DebugMallocStats(*[int(v) for v in m.groups()])) # return ret
[docs]def diff_debugmallocstats(a_stats: SysDebugMallocStats, b_stats: SysDebugMallocStats): """ This takes two SysDebugMallocStats objects and identifies what is different between them. The diff is a list of lines of identical form to :py:meth:`sys._debugmallocstats` with '+' or '-' where appropriate. Lines that are the same are omitted. """ ret: typing.List[str] = [] has_header = False # Firstly the DebugMallocStat list if len(a_stats.malloc_stats) != len(b_stats.malloc_stats): raise ValueError( f'Malloc stats length mismatch {len(a_stats.malloc_stats)} != {len(b_stats.malloc_stats)}' ) for a, b in zip(a_stats.malloc_stats, b_stats.malloc_stats): if a != b: if not has_header: ret.append('class size num pools blocks in use avail blocks') ret.append('----- ---- --------- ------------- ------------') has_header = True ret.append(diff_debug_malloc_stat(a, b)) has_header = False # Central two blocks # Arenas block = [] if b_stats.arenas.ntimes_arena_allocated != a_stats.arenas.ntimes_arena_allocated: block.append( f'# arenas allocated total =' f' {b_stats.arenas.ntimes_arena_allocated - a_stats.arenas.ntimes_arena_allocated:>+20,d}' ) if b_stats.arenas.arenas_reclaimed != a_stats.arenas.arenas_reclaimed: block.append( f'# arenas reclaimed =' f' {b_stats.arenas.arenas_reclaimed - a_stats.arenas.arenas_reclaimed:>+20,d}' ) if b_stats.arenas.narenas_highwater != a_stats.arenas.narenas_highwater: block.append( f'# arenas highwater mark =' f' {b_stats.arenas.narenas_highwater - a_stats.arenas.narenas_highwater:>+20,d}' ) assert a_stats.arenas.arena_size == b_stats.arenas.arena_size if b_stats.arenas.narenas != a_stats.arenas.narenas: block.append( f'# arenas allocated current =' f' {b_stats.arenas.narenas - a_stats.arenas.narenas:>+20,d}' ) lhs = f'{b_stats.arenas.narenas - a_stats.arenas.narenas:+d} arenas * {a_stats.arenas.arena_size} bytes/arena' block.append(f'{lhs:<34s} = {b_stats.arenas.arenas_total - a_stats.arenas.arenas_total:>+20,d}') if block: ret.append('') ret.extend(block) # Pools and blocks block = [] assert a_stats.pools_blocks.pool_size == b_stats.pools_blocks.pool_size if b_stats.pools_blocks.allocated_bytes != a_stats.pools_blocks.allocated_bytes: block.append( f'# bytes in allocated blocks =' f' {b_stats.pools_blocks.allocated_bytes - a_stats.pools_blocks.allocated_bytes:>+20,d}' ) if b_stats.pools_blocks.available_bytes != a_stats.pools_blocks.available_bytes: block.append( f'# bytes in available blocks =' f' {b_stats.pools_blocks.available_bytes - a_stats.pools_blocks.available_bytes:>+20,d}' ) if b_stats.pools_blocks.numfreepools != a_stats.pools_blocks.numfreepools: lhs = ( f'{b_stats.pools_blocks.numfreepools - a_stats.pools_blocks.numfreepools}' f' unused pools * {a_stats.pools_blocks.pool_size} bytes' ) block.append( f'{lhs:<34s} = {b_stats.pools_blocks.unused_pool_total - a_stats.pools_blocks.unused_pool_total:>+20,d}' ) if b_stats.pools_blocks.pool_header_bytes != a_stats.pools_blocks.pool_header_bytes: block.append( f'# bytes lost to pool headers =' f' {b_stats.pools_blocks.pool_header_bytes - a_stats.pools_blocks.pool_header_bytes:>+20,d}' ) if b_stats.pools_blocks.quantization != a_stats.pools_blocks.quantization: block.append( f'# bytes lost to quantization =' f' {b_stats.pools_blocks.quantization - a_stats.pools_blocks.quantization:>+20,d}' ) if b_stats.pools_blocks.arena_alignment != a_stats.pools_blocks.arena_alignment: block.append( f'# bytes lost to arena alignment =' f' {b_stats.pools_blocks.arena_alignment - a_stats.pools_blocks.arena_alignment:>+20,d}' ) if b_stats.pools_blocks.total != a_stats.pools_blocks.total: block.append( f'Total = {b_stats.pools_blocks.total - a_stats.pools_blocks.total:>+20,d}' ) if block: ret.append('') ret.extend(block) has_header = False # The DebugTypeStat objects, the lists might not be the same. union_object_types = set(a_stats.object_types()) | set(b_stats.object_types()) for object_type in sorted(union_object_types): diff_str = '' if a_stats.has_object_type(object_type): if b_stats.has_object_type(object_type): if a_stats.type_stat(object_type) != b_stats.type_stat(object_type): diff_str = diff_debug_type_stat(a_stats.type_stat(object_type), b_stats.type_stat(object_type)) else: # a only so dropped diff_str = f'-{a_stats.type_stat(object_type)!r}' else: # b only so added diff_str = f'+{a_stats.type_stat(object_type)!r}' if diff_str: if not has_header: ret.append('') has_header = True ret.append(diff_str) return ret
[docs]class DiffSysDebugMallocStats: """Context manager that compares two snapshots of ``sys._getdebugmallocstats()`` and can provide a diff between them."""
[docs] def __init__(self): self.before: typing.Optional[SysDebugMallocStats] = None self.after: typing.Optional[SysDebugMallocStats] = None self._diff: typing.Optional[str] = None
[docs] def __enter__(self): """Enters the context manager taking a snapshot of ``sys._getdebugmallocstats()``.""" self.before = SysDebugMallocStats() self.after = None return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """Exits the context manager taking a snapshot of ``sys._getdebugmallocstats()``.""" self.after = SysDebugMallocStats() return False
[docs] def diff(self) -> str: """Returns the difference between two snapshots.""" if self.before is None: raise RuntimeError('Context manager not entered.') if self.after is None: raise RuntimeError('Context manager not exited.') return '\n'.join(diff_debugmallocstats(self.before, self.after))
def main(): print('sys._debugmallocstats()') print(get_debugmallocstats().decode('ascii')) print() # dms_a = SysDebugMallocStats() # # print(repr(dms_a)) # l = [] # l.append({}) # l.append(set()) # l.append((1, 2, 3)) # for i in range(80): # l.append(tuple(list(range(4)))) # dms_b = SysDebugMallocStats() # # # pprint.pprint(diff_debugmallocstats(dms_a, dms_b)) # print('\n'.join(diff_debugmallocstats(dms_a, dms_b))) print(f'POOL_OVERHEAD {POOL_OVERHEAD}') with DiffSysDebugMallocStats() as diff_dms: l = [] l.append({}) l.append(set()) l.append((1, 2, 3)) for i in range(80): l.append(tuple(list(range(4)))) print(f'POOL_OVERHEAD {POOL_OVERHEAD}') print(diff_dms.diff()) return 0 if __name__ == '__main__': sys.exit(main())