Source code for catalyst.debug.instruments
# Copyright 2024 Xanadu Quantum Technologies Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Instrumentation module to report Catalyst & program performance.
"""
import copy
import datetime
import functools
import os
import platform
import sys
import time
import typing
from contextlib import contextmanager
# pylint: disable=f-string-without-interpolation,line-too-long
# Tested in LIT test suite.
# pragma: no cover
## API ##
[docs]
@contextmanager
def instrumentation(session_name, filename=None, detailed=False):
"""Instrumentation session to output information on wall time, CPU time,
and intermediate program size of a program during compilation and execution.
Args:
session_name (str): identifier to distinguish multiple sessions or runs within the same result file
filename (str): Desired path to write results to in YAML format. If ``None``, the
results will instead be printed to the console.
detailed (bool): Whether to instrument fine-grained steps in the compiler and runtime.
If ``False``, only high-level steps such as "program capture" and
"compilation" are reported.
**Example**
Printing an instrumentation session to the console:
>>> @qjit
... def expensive_function(a, b):
... return a + b
>>> with debug.instrumentation("session_name", detailed=False):
... expensive_function(1, 2)
[DIAGNOSTICS] Running capture walltime: 3.299 ms cputime: 3.294 ms programsize: 0 lines
[DIAGNOSTICS] Running generate_ir walltime: 4.228 ms cputime: 4.225 ms programsize: 14 lines
[DIAGNOSTICS] Running compile walltime: 57.182 ms cputime: 12.109 ms programsize: 121 lines
[DIAGNOSTICS] Running run walltime: 1.075 ms cputime: 1.072 ms
We can also write an instrumentation session to a YAML file:
>>> with debug.instrumentation("session_name", filename="session.yml", detailed=False):
... expensive_function(1, 2)
>>> with open('session.yml', 'r') as f:
... print(f.read())
2024-04-29 18:19:29.349886:
name: session_name
system:
os: Linux-6.1.58+-x86_64-with-glibc2.35
arch: x86_64
python: 3.10.12
results:
- capture:
walltime: 6.296216
cputime: 2.715764
programsize: 0
- generate_ir:
walltime: 8.84289
cputime: 8.836589
programsize: 14
- compile:
walltime: 199.249725
cputime: 38.820425
programsize: 121
- run:
walltime: 1.053613
cputime: 1.019584
"""
# A session cannot be tied to the creation of a QJIT object
# nor its lifetime, because it involves both compile time and runtime measurements. It cannot
# be a context-free process either since we need to write results to an existing results file.
session = InstrumentSession(session_name, filename, detailed)
try:
yield None
finally:
session.close()
def instrument(fn=None, *, size_from=None, has_finegrained=False):
"""Decorator that marks specific functions as targets for instrumentation.
Instrumentation is only performed when enabled by a session.
Args:
fn (Callable): function to instrument
size_from (int | None): optional index indicating from which result to measure program size
(by number of newlines in the string representation of the result)
has_finegrained (bool): whether to instrument finegrained steps in the compiler and runtime.
If ``False``, only high-level steps such as program capture and
compilation are reported.
**Example**
.. code-block:: python
@instrument
def expensive_function(a, b):
return a + b
@qjit
def fn(x):
y = jnp.sin(x) ** 3
z = expensive_function(x, y)
return jnp.cos(z)
>>> with catalyst.debug.instrumentation("session_name"):
... fn(0.43)
[DIAGNOSTICS] Running expensive_function walltime: 1.908 ms cputime: 1.904 ms
[DIAGNOSTICS] Running capture walltime: 6.691 ms cputime: 6.686 ms programsize: 5 lines
[DIAGNOSTICS] Running generate_ir walltime: 5.938 ms cputime: 5.934 ms programsize: 18 lines
[DIAGNOSTICS] Running compile walltime: 41.826 ms cputime: 13.287 ms programsize: 128 lines
[DIAGNOSTICS] Running run walltime: 0.801 ms cputime: 0.795 ms
"""
if fn is None:
return functools.partial(instrument, size_from=size_from, has_finegrained=has_finegrained)
stage_name = getattr(fn, "__name__", "UNKNOWN")
@functools.wraps(fn)
def wrapper(*args, **kwargs):
if not InstrumentSession.active:
return fn(*args, **kwargs)
with ResultReporter(stage_name, has_finegrained) as reporter:
self = args[0]
orig = copy.deepcopy(self.compile_options)
self.compile_options.keep_intermediate = True
fn_results, wall_time, cpu_time = time_function(fn, args, kwargs)
program_size = measure_program_size(fn_results, size_from)
reporter.commit_results(wall_time, cpu_time, program_size)
self.compile_options = orig
return fn_results
return wrapper
## DATA COLLECTION ##
def time_function(fn, args, kwargs):
"""Collect timing information for a function call.
Args:
fn (Callable): function to time
args (Tuple): positional function arguments
kwargs (Dict): keyword function arguments
Returns:
Any: function results
int: wall time
int: cpu time
"""
# Unclear which order is better here since they can't be measured at the same time.
start_wall = time.perf_counter_ns()
start_cpu = time.process_time_ns()
results = fn(*args, **kwargs)
stop_cpu = time.process_time_ns()
stop_wall = time.perf_counter_ns()
return results, stop_wall - start_wall, stop_cpu - start_cpu
def measure_program_size(results, size_from):
"""Collect program size information by counting the number of newlines in the textual form
of a given program representation. The representation is assumed to be provided in the
instrumented function results at the provided index.
Args:
results (Sequence): instrumented function results
size_from (int | None): result index to use for size measurement
Returns:
int: program size
"""
if size_from is None:
return None
if not isinstance(results, typing.Sequence):
results = (results,)
return str(results[size_from]).count("\n")
## REPORTING ##
class ResultReporter:
"""Report the result of a single instrumentation stage. Reporting is done either to the console
or to a specified file obtained from the instrumentation session. The report is appended to the
file in that case.
The functionality is implemented as a stateful context manager since the (high-level)
instrumentation results are only available after the compiler has reported the low-level
instrumentation results. We thus need to open the report file first to obtain to correct
position to insert the results after the function was measured.
"""
def __init__(self, stage_name, has_finegrained):
self.stage_name = stage_name
self.has_finegrained = has_finegrained
self.enable_finegrained = InstrumentSession.finegrained
self.insertion_point = None
self.filename = InstrumentSession.filename
def __enter__(self):
"""Save the report file insertion point before low-level results are written to it."""
if self.filename:
with open(self.filename, mode="a", encoding="UTF-8") as file:
self.insertion_point = file.tell()
return self
def __exit__(self, *_):
return False
def commit_results(self, wall_time, cpu_time, program_size):
"""Report the provided results to either the console or file."""
if self.filename:
self.dump_results(wall_time, cpu_time, program_size)
else:
self.print_results(wall_time, cpu_time, program_size)
def print_results(self, wall_time, cpu_time, program_size=None):
"""Print the provided results to console with some minor formatting."""
if self.enable_finegrained:
print(f"[DIAGNOSTICS] > Total {self.stage_name.ljust(23)}", end="\t", file=sys.stderr)
else:
print(f"[DIAGNOSTICS] Running {self.stage_name.ljust(23)}", end="\t", file=sys.stderr)
formatted_wall_time = (str(wall_time // 1e3 / 1e3) + " ms").ljust(12)
print(f"walltime: {formatted_wall_time}", end="\t", file=sys.stderr)
formatted_cpu_time = (str(cpu_time // 1e3 / 1e3) + " ms").ljust(12)
print(f"cputime: {formatted_cpu_time}", end="\t", file=sys.stderr)
if program_size is not None:
print(f"programsize: {program_size} lines", end="", file=sys.stderr)
print(end="\n", file=sys.stderr)
def dump_results(self, wall_time, cpu_time, program_size=None):
"""Dump the provided results to file accounting for (potential) low-level instrumentation
results for the same stage."""
with open(self.filename, mode="r+", encoding="UTF-8") as file:
file.seek(self.insertion_point)
existing_text = file.read() # from low-level instrumentation
file.seek(self.insertion_point)
file.write(f" - {self.stage_name}:\n")
file.write(f" walltime: {wall_time / 1e6}\n")
file.write(f" cputime: {cpu_time / 1e6}\n")
if program_size is not None:
file.write(f" programsize: {program_size}\n")
if self.has_finegrained and self.enable_finegrained:
file.write(f" finegrained:\n")
file.write(existing_text)
@staticmethod
def dump_header(session_name):
"""Write the session header to file, including timestamp, session name, and system info."""
filename = InstrumentSession.filename
current_time = datetime.datetime.now()
with open(filename, mode="a", encoding="UTF-8") as file:
file.write(f"\n{current_time}:\n")
file.write(f" name: {session_name}\n")
file.write(f" system:\n")
file.write(f" os: {platform.platform(terse=True)}\n")
file.write(f" arch: {platform.machine()}\n")
file.write(f" python: {platform.python_version()}\n")
file.write(f" results:\n")
## SESSION ##
class InstrumentSession:
"""Provides access to global state during instrumentation and sets up / tears down
the environment used by the C++ instrumentation code. To be used as a context manager."""
active = False
filename = None
finegrained = False
def __init__(self, session_name, filename, detailed):
self.enable_flag = os.environ.pop("ENABLE_DIAGNOSTICS", None)
self.path_flag = os.environ.pop("DIAGNOSTICS_RESULTS_PATH", None)
self.open_session(session_name, filename, detailed)
def close(self):
"""Terminate the instrumentation session."""
self.close_session(self.enable_flag, self.path_flag)
@staticmethod
def open_session(session_name, filename, detailed):
"""Open an instrumentation session. Sets the global state and env variables."""
InstrumentSession.active = True
InstrumentSession.filename = filename
InstrumentSession.finegrained = detailed
if detailed:
os.environ["ENABLE_DIAGNOSTICS"] = "ON"
if filename:
os.environ["DIAGNOSTICS_RESULTS_PATH"] = filename
ResultReporter.dump_header(session_name)
@staticmethod
def close_session(enable_flag, path_flag):
"""Close an instrumentation session. Resets the global state and env variables."""
InstrumentSession.active = False
InstrumentSession.filename = None
InstrumentSession.finegrained = False
if enable_flag is None:
os.environ.pop("ENABLE_DIAGNOSTICS", None) # safely delete
else:
os.environ["ENABLE_DIAGNOSTICS"] = enable_flag
if path_flag is None:
os.environ.pop("ENABLE_DIAGNOSTICS", None) # safely delete
else:
os.environ["DIAGNOSTICS_RESULTS_PATH"] = path_flag
_modules/catalyst/debug/instruments
Download Python script
Download Notebook
View on GitHub