Skip to content

Logging

cartpole.log

Level

Classic log levels: UNKNOWN, DEBUG, INFO, WARNING, ERROR, FATAL

Source code in cartpole/log.py
class Level(enum.IntEnum):
    """
    Classic log levels: `UNKNOWN`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `FATAL`
    """

    UNKNOWN = 0
    DEBUG   = 1
    INFO    = 2
    WARNING = 3
    ERROR   = 4
    FATAL   = 5

MCAPLogger

Logger to mcap file.

Example
with MCAPLogger(log_path='log.mcap') as log:
    obj = ... # some pydantic object
    log.publish('/topic', obj, stamp)
    log.info('message')
Source code in cartpole/log.py
class MCAPLogger:
    """
    Logger to mcap file.

    Example
    -------
    ```python
    with MCAPLogger(log_path='log.mcap') as log:
        obj = ... # some pydantic object
        log.publish('/topic', obj, stamp)
        log.info('message')
    ```

    """

    def __init__(self, log_path: str, level: Level=Level.INFO, compress=True):
        """
        Parameters
        ----------
        log_path: str
            path to mcap log file
        level: Level
            log level (`UNKNOWN`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `FATAL`)

        compress: bool
            enable compression
        """

        self._pylog = get_pylogger('cartpole.mcap', level)

        self._writer = Writer(
            open(log_path, "wb"),
            compression=CompressionType.ZSTD if compress else CompressionType.NONE,
        )

        self._writer.start()
        self._topic_to_registration: Dict[str, Registration] = {}

        # preventive topic creation
        self.registration_log = self._register(
            FOXGLOVE_LOG_TOPIC,
            FOXGLOVE_LOG_MSG_TYPE,
            FOXGLOVE_LOGM_SG_SCHEMA)

    def __enter__(self):
        return self

    def _register(self, topic_name: str, name: str, schema: str) -> Registration:
        if topic_name in self._topic_to_registration:
            cached = self._topic_to_registration[topic_name]
            assert cached.name == name, f'Topic {topic_name} registered with {cached.name}'
            return cached

        schema_id = self._writer.register_schema(
            name=name,
            encoding="jsonschema",
            data=schema.encode())

        channel_id = self._writer.register_channel(
            schema_id=schema_id,
            topic=topic_name,
            message_encoding="json")

        cached = Registration(name=name, channel_id=channel_id)
        self._topic_to_registration[topic_name] = cached
        self._pylog.debug('id=%i topic=\'%s\', type=\'%s\'', channel_id, topic_name, name)

        return cached

    def _register_class(self, topic_name: str, cls: Any) -> Registration:
        name = cls.__name__
        assert issubclass(cls, BaseModel), 'Required pydantic model, but got {name}'
        return self._register(topic_name, name, json.dumps(cls.model_json_schema()))

    def publish(self, topic_name: str, obj: BaseModel, stamp: float) -> None:
        """
        Publish object to topic.

        Parameters:
        -----------
        topic_name:
            topic name
        obj: Any
            object to dump (pydantic model)
        stamp: float
            timestamp in nanoseconds (float)
        """

        registation = self._register_class(topic_name, type(obj))
        self._writer.add_message(
            channel_id=registation.channel_id,
            log_time=to_ns(stamp),
            data=obj.model_dump_json().encode(),
            publish_time=to_ns(stamp),
        )

    def log(self, msg: str, stamp: float, level: Level) -> None:
        """
        Print message to topic `/log`.

        Parameters:
        -----------
        msg: str
            message to print
        stamp: float
            timestamp in nanoseconds (float)
        level: Level
            log level (`UNKNOWN`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `FATAL`)
        """

        sec, nsec = to_stamp(stamp)
        stamp_ns = to_ns(stamp)

        obj = {
            "timestamp": {"sec": sec, "nsec": nsec},
            "level": int(level),
            "message": msg,
            "name": "cartpole",
            "file": "/dev/null",
            "line": 0
        }

        self._writer.add_message(
                channel_id=self.registration_log.channel_id,
                log_time=stamp_ns,
                data=json.dumps(obj).encode(),
                publish_time=stamp_ns)

    def debug(self, msg: str, stamp: float) -> None:
        """
        Print message to topic `/log` with `DEBUG` level.
        """
        self.log(msg, stamp, Level.DEBUG)

    def info(self, msg: str, stamp: float) -> None:
        """
        Print message to topic `/log` with `INFO` level.
        """
        self.log(msg, stamp, Level.INFO)

    def warning(self, msg: str, stamp: float) -> None:
        """
        Print message to topic `/log` with `WARNING` level.
        """
        self.log(msg, stamp, Level.WARNING)

    def error(self, msg: str, stamp: float) -> None:
        """
        Print message to topic `/log` with `ERROR` level.
        """
        self.log(msg, stamp, Level.ERROR)

    def fatal(self, msg: str, stamp: float) -> None:
        """
        Print message to topic `/log` with `FATAL` level.
        """
        self.log(msg, stamp, Level.FATAL)

    def close(self):
        """
        Free log resources.
        """
        self._writer.finish()

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()

__init__(log_path: str, level: Level = Level.INFO, compress=True)

PARAMETER DESCRIPTION
log_path

path to mcap log file

TYPE: str

level

log level (UNKNOWN, DEBUG, INFO, WARNING, ERROR, FATAL)

TYPE: Level DEFAULT: INFO

compress

enable compression

DEFAULT: True

Source code in cartpole/log.py
def __init__(self, log_path: str, level: Level=Level.INFO, compress=True):
    """
    Parameters
    ----------
    log_path: str
        path to mcap log file
    level: Level
        log level (`UNKNOWN`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `FATAL`)

    compress: bool
        enable compression
    """

    self._pylog = get_pylogger('cartpole.mcap', level)

    self._writer = Writer(
        open(log_path, "wb"),
        compression=CompressionType.ZSTD if compress else CompressionType.NONE,
    )

    self._writer.start()
    self._topic_to_registration: Dict[str, Registration] = {}

    # preventive topic creation
    self.registration_log = self._register(
        FOXGLOVE_LOG_TOPIC,
        FOXGLOVE_LOG_MSG_TYPE,
        FOXGLOVE_LOGM_SG_SCHEMA)

publish(topic_name: str, obj: BaseModel, stamp: float) -> None

Publish object to topic.

Parameters:

topic_name: topic name obj: Any object to dump (pydantic model) stamp: float timestamp in nanoseconds (float)

Source code in cartpole/log.py
def publish(self, topic_name: str, obj: BaseModel, stamp: float) -> None:
    """
    Publish object to topic.

    Parameters:
    -----------
    topic_name:
        topic name
    obj: Any
        object to dump (pydantic model)
    stamp: float
        timestamp in nanoseconds (float)
    """

    registation = self._register_class(topic_name, type(obj))
    self._writer.add_message(
        channel_id=registation.channel_id,
        log_time=to_ns(stamp),
        data=obj.model_dump_json().encode(),
        publish_time=to_ns(stamp),
    )

log(msg: str, stamp: float, level: Level) -> None

Print message to topic /log.

Parameters:

msg: str message to print stamp: float timestamp in nanoseconds (float) level: Level log level (UNKNOWN, DEBUG, INFO, WARNING, ERROR, FATAL)

Source code in cartpole/log.py
def log(self, msg: str, stamp: float, level: Level) -> None:
    """
    Print message to topic `/log`.

    Parameters:
    -----------
    msg: str
        message to print
    stamp: float
        timestamp in nanoseconds (float)
    level: Level
        log level (`UNKNOWN`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `FATAL`)
    """

    sec, nsec = to_stamp(stamp)
    stamp_ns = to_ns(stamp)

    obj = {
        "timestamp": {"sec": sec, "nsec": nsec},
        "level": int(level),
        "message": msg,
        "name": "cartpole",
        "file": "/dev/null",
        "line": 0
    }

    self._writer.add_message(
            channel_id=self.registration_log.channel_id,
            log_time=stamp_ns,
            data=json.dumps(obj).encode(),
            publish_time=stamp_ns)

debug(msg: str, stamp: float) -> None

Print message to topic /log with DEBUG level.

Source code in cartpole/log.py
def debug(self, msg: str, stamp: float) -> None:
    """
    Print message to topic `/log` with `DEBUG` level.
    """
    self.log(msg, stamp, Level.DEBUG)

info(msg: str, stamp: float) -> None

Print message to topic /log with INFO level.

Source code in cartpole/log.py
def info(self, msg: str, stamp: float) -> None:
    """
    Print message to topic `/log` with `INFO` level.
    """
    self.log(msg, stamp, Level.INFO)

warning(msg: str, stamp: float) -> None

Print message to topic /log with WARNING level.

Source code in cartpole/log.py
def warning(self, msg: str, stamp: float) -> None:
    """
    Print message to topic `/log` with `WARNING` level.
    """
    self.log(msg, stamp, Level.WARNING)

error(msg: str, stamp: float) -> None

Print message to topic /log with ERROR level.

Source code in cartpole/log.py
def error(self, msg: str, stamp: float) -> None:
    """
    Print message to topic `/log` with `ERROR` level.
    """
    self.log(msg, stamp, Level.ERROR)

fatal(msg: str, stamp: float) -> None

Print message to topic /log with FATAL level.

Source code in cartpole/log.py
def fatal(self, msg: str, stamp: float) -> None:
    """
    Print message to topic `/log` with `FATAL` level.
    """
    self.log(msg, stamp, Level.FATAL)

close()

Free log resources.

Source code in cartpole/log.py
def close(self):
    """
    Free log resources.
    """
    self._writer.finish()

FoxgloveWebsocketLogger

Logger to foxglove websocket, messages are available in real time. Class will start new thread with asyncio event loop.

Example
with FoxgloveWebsocketLogger() as log:
    obj = ... # some pydantic object
    log.publish('/topic', obj, stamp)
    log.info('message')
Source code in cartpole/log.py
class FoxgloveWebsocketLogger:
    """
    Logger to foxglove websocket, messages are available in real time.
    Class will start new thread with asyncio event loop.

    Example
    -------
    ```
    with FoxgloveWebsocketLogger() as log:
        obj = ... # some pydantic object
        log.publish('/topic', obj, stamp)
        log.info('message')
    ```
    """

    def __init__(self, level: int = Level.INFO):
        """
        Parameters
        ----------
        level: Level
            log level (`UNKNOWN`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `FATAL`)
        """

        self._loop = asyncio.new_event_loop()
        self._input_queue = asyncio.Queue()
        self._exception_queue = asyncio.Queue()
        self._stop = Event()

        args = (self._loop, self._input_queue, self._stop, self._exception_queue, level)

        self._foxlgove_thread = Thread(
            target=foxglove_main,
            name='foxglove_main_loop',
            daemon=True,
            args=args)

        self._foxlgove_thread.start()

    def __enter__(self):
        return self

    def publish(self, topic_name: str, obj: BaseModel, stamp: float) -> None:
        """
        Publish object to topic.

        Parameters
        ----------
        topic_name: str
            topic name
        obj: BaseModel
            object to dump (pydantic model)
        stamp: float
            timestamp in nanoseconds (float)
        """

        if not (self._loop.is_running() and self._foxlgove_thread.is_alive()):
            if not self._exception_queue.empty():
                raise self._exception_queue.get_nowait()

            raise AssertionError('Foxglove logger is not running')

        item = (topic_name, stamp, obj)
        asyncio.run_coroutine_threadsafe(self._input_queue.put(item), self._loop)

    def log(self, msg: str, stamp: float, level: int) -> None:
        """
        Print message to topic `/log`.

        Parameters
        ----------
        msg: str
            message to print
        stamp: float
            timestamp in nanoseconds (float)
        level: Level
            log level (`UNKNOWN`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `FATAL`)
        """

        self.publish("`/log`", msg, stamp)

    def debug(self, msg: str, stamp: float) -> None:
        """
        Print message to topic `/log` with `DEBUG` level.
        """
        self.log(msg, stamp, Level.DEBUG)

    def info(self, msg: str, stamp: float) -> None:
        """
        Print message to topic `/log` with `INFO` level.
        """
        self.log(msg, stamp, Level.INFO)

    def warning(self, msg: str, stamp: float) -> None:
        """
        Print message to topic `/log` with `WARNING` level.
        """
        self.log(msg, stamp, Level.WARNING)

    def error(self, msg: str, stamp: float) -> None:
        """
        Print message to topic `/log` with `ERROR` level.
        """
        self.log(msg, stamp, Level.ERROR)

    def fatal(self, msg: str, stamp: float) -> None:
        """
        Print message to topic `/log` with `FATAL` level.
        """
        self.log(msg, stamp, Level.FATAL)

    def close(self):
        """
        Free log resources.
        """

        self._stop.set()
        self._foxlgove_thread.join()

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()

__init__(level: int = Level.INFO)

PARAMETER DESCRIPTION
level

log level (UNKNOWN, DEBUG, INFO, WARNING, ERROR, FATAL)

TYPE: int DEFAULT: INFO

Source code in cartpole/log.py
def __init__(self, level: int = Level.INFO):
    """
    Parameters
    ----------
    level: Level
        log level (`UNKNOWN`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `FATAL`)
    """

    self._loop = asyncio.new_event_loop()
    self._input_queue = asyncio.Queue()
    self._exception_queue = asyncio.Queue()
    self._stop = Event()

    args = (self._loop, self._input_queue, self._stop, self._exception_queue, level)

    self._foxlgove_thread = Thread(
        target=foxglove_main,
        name='foxglove_main_loop',
        daemon=True,
        args=args)

    self._foxlgove_thread.start()

publish(topic_name: str, obj: BaseModel, stamp: float) -> None

Publish object to topic.

PARAMETER DESCRIPTION
topic_name

topic name

TYPE: str

obj

object to dump (pydantic model)

TYPE: BaseModel

stamp

timestamp in nanoseconds (float)

TYPE: float

Source code in cartpole/log.py
def publish(self, topic_name: str, obj: BaseModel, stamp: float) -> None:
    """
    Publish object to topic.

    Parameters
    ----------
    topic_name: str
        topic name
    obj: BaseModel
        object to dump (pydantic model)
    stamp: float
        timestamp in nanoseconds (float)
    """

    if not (self._loop.is_running() and self._foxlgove_thread.is_alive()):
        if not self._exception_queue.empty():
            raise self._exception_queue.get_nowait()

        raise AssertionError('Foxglove logger is not running')

    item = (topic_name, stamp, obj)
    asyncio.run_coroutine_threadsafe(self._input_queue.put(item), self._loop)

log(msg: str, stamp: float, level: int) -> None

Print message to topic /log.

PARAMETER DESCRIPTION
msg

message to print

TYPE: str

stamp

timestamp in nanoseconds (float)

TYPE: float

level

log level (UNKNOWN, DEBUG, INFO, WARNING, ERROR, FATAL)

TYPE: int

Source code in cartpole/log.py
def log(self, msg: str, stamp: float, level: int) -> None:
    """
    Print message to topic `/log`.

    Parameters
    ----------
    msg: str
        message to print
    stamp: float
        timestamp in nanoseconds (float)
    level: Level
        log level (`UNKNOWN`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `FATAL`)
    """

    self.publish("`/log`", msg, stamp)

debug(msg: str, stamp: float) -> None

Print message to topic /log with DEBUG level.

Source code in cartpole/log.py
def debug(self, msg: str, stamp: float) -> None:
    """
    Print message to topic `/log` with `DEBUG` level.
    """
    self.log(msg, stamp, Level.DEBUG)

info(msg: str, stamp: float) -> None

Print message to topic /log with INFO level.

Source code in cartpole/log.py
def info(self, msg: str, stamp: float) -> None:
    """
    Print message to topic `/log` with `INFO` level.
    """
    self.log(msg, stamp, Level.INFO)

warning(msg: str, stamp: float) -> None

Print message to topic /log with WARNING level.

Source code in cartpole/log.py
def warning(self, msg: str, stamp: float) -> None:
    """
    Print message to topic `/log` with `WARNING` level.
    """
    self.log(msg, stamp, Level.WARNING)

error(msg: str, stamp: float) -> None

Print message to topic /log with ERROR level.

Source code in cartpole/log.py
def error(self, msg: str, stamp: float) -> None:
    """
    Print message to topic `/log` with `ERROR` level.
    """
    self.log(msg, stamp, Level.ERROR)

fatal(msg: str, stamp: float) -> None

Print message to topic /log with FATAL level.

Source code in cartpole/log.py
def fatal(self, msg: str, stamp: float) -> None:
    """
    Print message to topic `/log` with `FATAL` level.
    """
    self.log(msg, stamp, Level.FATAL)

close()

Free log resources.

Source code in cartpole/log.py
def close(self):
    """
    Free log resources.
    """

    self._stop.set()
    self._foxlgove_thread.join()

Logger

Compound Logger class that logs to console, foxglove and mcap.

Example
with Logger(log_path='log.mcap', level=INFO) as log:
    obj = ... # some pydantic object

    log.publish('/topic', obj)
    log.info('message')
Source code in cartpole/log.py
class Logger:
    """
    Compound Logger class that logs to console, foxglove and mcap.

    Example
    -------
    ```
    with Logger(log_path='log.mcap', level=INFO) as log:
        obj = ... # some pydantic object

        log.publish('/topic', obj)
        log.info('message')
    ```
    """

    def __init__(self, log_path: str = '', level: Level = Level.INFO):
        """
        Parameters
        ----------
        log_path: str
            path to mcap log file, if not provided, no mcap log will be created
        level: Level
            log level (`UNKNOWN`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `FATAL`)
        """

        self._pylog = get_pylogger('cartpole', level)
        self._foxglove_log = FoxgloveWebsocketLogger()
        self._mcap_log = None

        if log_path:
            self._mcap_log = MCAPLogger(log_path, level=level)

    def publish(self, topic_name: str, obj: BaseModel, stamp: float) -> None:
        """
        Parameters
        ----------
        topic_name: str
            topic name
        obj: BaseModel
            pydantic object
        stamp: float
            timestamp in nanoseconds (float), if not provided, current time used
        """

        if self._mcap_log:
            self._mcap_log.publish(topic_name, obj, stamp)

        self._foxglove_log.publish(topic_name, obj, stamp)

    def log(self, msg: str, stamp: float, level: Level = Level.INFO) -> None:
        """
        Print message to console and topic `/log`.

        Parameters
        ----------
        msg: str
            message to print
        stamp: float
            timestamp in nanoseconds (float), if not provided, current time used
        level: Level
            log level (`UNKNOWN`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `FATAL`)
        """

        self._pylog.log(pylog_level(level), f'{stamp:.3f}: {msg}')
        self._foxglove_log.log(msg, stamp, level)

        if self._mcap_log:
            self._mcap_log.log(msg, stamp, level)

    def debug(self, msg: str, stamp: float) -> None:
        """
        Print message to console and topic `/log` with `DEBUG` level.
        """
        self.info(msg, stamp, Level.DEBUG)

    def info(self, msg: str, stamp: float) -> None:
        """
        Print message to console and topic `/log` with `INFO` level.
        """
        self.log(msg, stamp, Level.INFO)

    def warning(self, msg: str, stamp: float) -> None:
        """
        Print message to console and topic `/log` with `WARNING` level.
        """
        self.log(msg, stamp, Level.WARNING)

    def error(self, msg: str, stamp: float) -> None:
        """
        Print message to console and topic `/log` with `ERROR` level.
        """
        self.log(msg, stamp, Level.ERROR)

    def fatal(self, msg: str, stamp: float) -> None:
        """
        Print message to console and topic `/log` with `FATAL` level.
        """
        self.log(msg, stamp, Level.FATAL)

    def close(self):
        """
        Free log resources.
        """

        self._foxglove_log.close()
        if self._mcap_log:
            self._mcap_log.close()

    def __exit__(self):
        self.close()

__init__(log_path: str = '', level: Level = Level.INFO)

PARAMETER DESCRIPTION
log_path

path to mcap log file, if not provided, no mcap log will be created

TYPE: str DEFAULT: ''

level

log level (UNKNOWN, DEBUG, INFO, WARNING, ERROR, FATAL)

TYPE: Level DEFAULT: INFO

Source code in cartpole/log.py
def __init__(self, log_path: str = '', level: Level = Level.INFO):
    """
    Parameters
    ----------
    log_path: str
        path to mcap log file, if not provided, no mcap log will be created
    level: Level
        log level (`UNKNOWN`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `FATAL`)
    """

    self._pylog = get_pylogger('cartpole', level)
    self._foxglove_log = FoxgloveWebsocketLogger()
    self._mcap_log = None

    if log_path:
        self._mcap_log = MCAPLogger(log_path, level=level)

publish(topic_name: str, obj: BaseModel, stamp: float) -> None

PARAMETER DESCRIPTION
topic_name

topic name

TYPE: str

obj

pydantic object

TYPE: BaseModel

stamp

timestamp in nanoseconds (float), if not provided, current time used

TYPE: float

Source code in cartpole/log.py
def publish(self, topic_name: str, obj: BaseModel, stamp: float) -> None:
    """
    Parameters
    ----------
    topic_name: str
        topic name
    obj: BaseModel
        pydantic object
    stamp: float
        timestamp in nanoseconds (float), if not provided, current time used
    """

    if self._mcap_log:
        self._mcap_log.publish(topic_name, obj, stamp)

    self._foxglove_log.publish(topic_name, obj, stamp)

log(msg: str, stamp: float, level: Level = Level.INFO) -> None

Print message to console and topic /log.

PARAMETER DESCRIPTION
msg

message to print

TYPE: str

stamp

timestamp in nanoseconds (float), if not provided, current time used

TYPE: float

level

log level (UNKNOWN, DEBUG, INFO, WARNING, ERROR, FATAL)

TYPE: Level DEFAULT: INFO

Source code in cartpole/log.py
def log(self, msg: str, stamp: float, level: Level = Level.INFO) -> None:
    """
    Print message to console and topic `/log`.

    Parameters
    ----------
    msg: str
        message to print
    stamp: float
        timestamp in nanoseconds (float), if not provided, current time used
    level: Level
        log level (`UNKNOWN`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `FATAL`)
    """

    self._pylog.log(pylog_level(level), f'{stamp:.3f}: {msg}')
    self._foxglove_log.log(msg, stamp, level)

    if self._mcap_log:
        self._mcap_log.log(msg, stamp, level)

debug(msg: str, stamp: float) -> None

Print message to console and topic /log with DEBUG level.

Source code in cartpole/log.py
def debug(self, msg: str, stamp: float) -> None:
    """
    Print message to console and topic `/log` with `DEBUG` level.
    """
    self.info(msg, stamp, Level.DEBUG)

info(msg: str, stamp: float) -> None

Print message to console and topic /log with INFO level.

Source code in cartpole/log.py
def info(self, msg: str, stamp: float) -> None:
    """
    Print message to console and topic `/log` with `INFO` level.
    """
    self.log(msg, stamp, Level.INFO)

warning(msg: str, stamp: float) -> None

Print message to console and topic /log with WARNING level.

Source code in cartpole/log.py
def warning(self, msg: str, stamp: float) -> None:
    """
    Print message to console and topic `/log` with `WARNING` level.
    """
    self.log(msg, stamp, Level.WARNING)

error(msg: str, stamp: float) -> None

Print message to console and topic /log with ERROR level.

Source code in cartpole/log.py
def error(self, msg: str, stamp: float) -> None:
    """
    Print message to console and topic `/log` with `ERROR` level.
    """
    self.log(msg, stamp, Level.ERROR)

fatal(msg: str, stamp: float) -> None

Print message to console and topic /log with FATAL level.

Source code in cartpole/log.py
def fatal(self, msg: str, stamp: float) -> None:
    """
    Print message to console and topic `/log` with `FATAL` level.
    """
    self.log(msg, stamp, Level.FATAL)

close()

Free log resources.

Source code in cartpole/log.py
def close(self):
    """
    Free log resources.
    """

    self._foxglove_log.close()
    if self._mcap_log:
        self._mcap_log.close()

setup(log_path: str = '', level: Level = Level.INFO) -> None

Setup gloval logger.

PARAMETER DESCRIPTION
log_path

path to mcap log file, if not provided, no mcap log will be created

TYPE: str DEFAULT: ''

level

log level (UNKNOWN, DEBUG, INFO, WARNING, ERROR, FATAL)

TYPE: Level DEFAULT: INFO

Source code in cartpole/log.py
def setup(log_path: str = '', level: Level = Level.INFO) -> None:
    """
    Setup gloval logger.

    Parameters
    ----------
    log_path: str
        path to mcap log file, if not provided, no mcap log will be created
    level: Level
        log level (`UNKNOWN`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `FATAL`)
    """

    global __logger
    close()
    __logger = Logger(log_path=log_path, level=level)

close()

Close global logger

Source code in cartpole/log.py
def close():
    """
    Close global logger
    """

    global __logger

    if __logger:
        __logger.close()
        __logger = None

get_logger() -> Logger

Get global logger instance

Source code in cartpole/log.py
def get_logger() -> Logger:
    """
    Get global logger instance
    """

    global __logger
    if not __logger:
        setup()

    return __logger

publish(topic_name: str, obj: BaseModel, stamp: float | None = None) -> None

Publish object to topic of global logger. If logger is not set, it will be created with default settings.

PARAMETER DESCRIPTION
topic_name

topic name

TYPE: str

obj

pydantic model

TYPE: BaseModel

stamp

timestamp in nanoseconds (float), if not provided, current time used

TYPE: float | None DEFAULT: None

Source code in cartpole/log.py
def publish(topic_name: str, obj: BaseModel, stamp: float|None = None) -> None:
    """
    Publish object to topic of global logger.
    If logger is not set, it will be created with default settings.

    Parameters
    ----------
    topic_name: str
        topic name
    obj: BaseModel
        pydantic model
    stamp: float
        timestamp in nanoseconds (float), if not provided, current time used
    """

    get_logger().publish(topic_name, obj, this_or_now(stamp))

log(msg: str, stamp: float | None = None, level: Level = Level.INFO) -> None

Print message to console and topic /log. If logger is not set, it will be created with default settings.

PARAMETER DESCRIPTION
msg

message to print

TYPE: str

stamp

timestamp in nanoseconds (float), if not provided, current time used

TYPE: float | None DEFAULT: None

level

log level (UNKNOWN, DEBUG, INFO, WARNING, ERROR, FATAL)

TYPE: Level DEFAULT: INFO

Source code in cartpole/log.py
def log(msg: str, stamp: float|None = None, level: Level = Level.INFO) -> None:
    """
    Print message to console and topic `/log`.
    If logger is not set, it will be created with default settings.

    Parameters
    ----------
    msg: str
        message to print
    stamp: float
        timestamp in nanoseconds (float), if not provided, current time used
    level: Level
        log level (`UNKNOWN`, `DEBUG`, `INFO`, `WARNING`, `ERROR`, `FATAL`)
    """

    get_logger().log(msg, this_or_now(stamp), level)

debug(msg: str, stamp: float | None = None) -> None

Print message to console and topic /log with DEBUG level. If logger is not set, it will be created with default settings.

Source code in cartpole/log.py
def debug(msg: str, stamp: float|None = None) -> None:
    """
    Print message to console and topic `/log` with `DEBUG` level.
    If logger is not set, it will be created with default settings.
    """
    log(msg, stamp, Level.DEBUG)

info(msg: str, stamp: float | None = None) -> None

Print message to console and topic /log with INFO level. If logger is not set, it will be created with default settings.

Source code in cartpole/log.py
def info(msg: str, stamp: float|None = None) -> None:
    """
    Print message to console and topic `/log` with `INFO` level.
    If logger is not set, it will be created with default settings.
    """
    log(msg, stamp, Level.INFO)

warning(msg: str, stamp: float | None = None) -> None

Print message to console and topic /log with WARNING level. If logger is not set, it will be created with default settings.

Source code in cartpole/log.py
def warning(msg: str, stamp: float|None = None) -> None:
    """
    Print message to console and topic `/log` with `WARNING` level.
    If logger is not set, it will be created with default settings.
    """
    log(msg, stamp, Level.WARNING)

error(msg: str, stamp: float | None = None) -> None

Print message to console and topic /log with ERROR level. If logger is not set, it will be created with default settings.

Source code in cartpole/log.py
def error(msg: str, stamp: float|None = None) -> None:
    """
    Print message to console and topic `/log` with `ERROR` level.
    If logger is not set, it will be created with default settings.
    """
    log(msg, stamp, Level.ERROR)

fatal(msg: str, stamp: float | None = None) -> None

Print message to console and topic /log with FATAL level. If logger is not set, it will be created with default settings.

Source code in cartpole/log.py
def fatal(msg: str, stamp: float|None = None) -> None:
    """
    Print message to console and topic `/log` with `FATAL` level.
    If logger is not set, it will be created with default settings.
    """
    log(msg, stamp, Level.FATAL)