Package rtms

Functions

def configure_logger(options: dict)
Expand source code
def configure_logger(options: dict):
    """Configure the logger with the specified options"""
    global _log_level, _log_format, _log_enabled

    if 'level' in options:
        _log_level = options['level']

    if 'format' in options:
        _log_format = options['format']

    if 'enabled' in options:
        _log_enabled = options['enabled']

    log_info('logger', f"Logger configured: level={_log_level}, format={_log_format}, enabled={_log_enabled}")

    log_debug("rtms", "Importing _rtms module")

Configure the logger with the specified options

def generate_signature(client, secret, uuid, rtms_stream_id)
Expand source code
def generate_signature(client, secret, uuid, rtms_stream_id):
    """Generate a signature for RTMS authentication"""
    client_id = os.getenv("ZM_RTMS_CLIENT", client)
    client_secret = os.getenv("ZM_RTMS_SECRET", secret)

    if not client_id:
        raise EnvironmentError("Client ID cannot be blank")
    elif not client_secret:
        raise EnvironmentError("Client Secret cannot be blank")

    message = f"{client_id},{uuid},{rtms_stream_id}"

    signature = hmac.new(
        client_secret.encode('utf-8'),
        message.encode('utf-8'),
        hashlib.sha256
    ).hexdigest()

    return signature

Generate a signature for RTMS authentication

def initialize(ca_path=None)
Expand source code
def initialize(ca_path=None):
    """
    Initialize the RTMS SDK.

    Note: The SDK is automatically initialized when you create a Client instance.
    You only need to call this if you want to initialize with a custom CA path.

    Args:
        ca_path (str, optional): Path to the CA certificate file.

    Returns:
        bool: True if initialization was successful
    """
    ca_path = find_ca_certificate(ca_path)
    try:
        _ClientBase.initialize(ca_path)
        Client._sdk_initialized = True
        return True
    except Exception as e:
        log_error("rtms", f"Error initializing RTMS SDK: {e}")
        return False

Initialize the RTMS SDK.

Note: The SDK is automatically initialized when you create a Client instance. You only need to call this if you want to initialize with a custom CA path.

Args

ca_path : str, optional
Path to the CA certificate file.

Returns

bool
True if initialization was successful
def log_debug(component, message, details=None)
Expand source code
def log_debug(component, message, details=None):
    """Log a debug message"""
    _log(LogLevel.DEBUG, component, message, details)

Log a debug message

def log_error(component, message, details=None)
Expand source code
def log_error(component, message, details=None):
    """Log an error message"""
    _log(LogLevel.ERROR, component, message, details)

Log an error message

def log_info(component, message, details=None)
Expand source code
def log_info(component, message, details=None):
    """Log an info message"""
    _log(LogLevel.INFO, component, message, details)

Log an info message

def log_warn(component, message, details=None)
Expand source code
def log_warn(component, message, details=None):
    """Log a warning message"""
    _log(LogLevel.WARN, component, message, details)

Log a warning message

def onWebhookEvent(callback=None, port=None, path=None)
Expand source code
def onWebhookEvent(callback=None, port=None, path=None):
    """
    Start a webhook server to receive events from Zoom.

    This function creates an HTTP server that listens for webhook events from Zoom.
    When a webhook event is received, it parses the JSON payload and passes it to
    the provided callback function.

    Can be used as a decorator or a direct function call:

    @rtms.onWebhookEvent(port=8080, path='/webhook')
    def handle_webhook(payload):
        if payload.get('event') == 'meeting.rtms.started':
            # Create a client and join
            client = rtms.Client()
            client.join(...)

    Args:
        callback (callable, optional): Function to call when a webhook is received
        port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
        path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'

    Returns:
        callable: Decorator function if used as a decorator
    """
    global _webhook_server

    # Determine port and path
    webhook_port = port or int(os.getenv('ZM_RTMS_PORT', '8080'))
    webhook_path = path or os.getenv('ZM_RTMS_PATH', '/')

    # If used as a decorator without arguments
    if callback is not None and callable(callback):
        if _webhook_server is None:
            _webhook_server = WebhookServer(webhook_port, webhook_path)
        _webhook_server.start(callback)
        return callback

    # If used as a decorator with arguments or as a method call
    def decorator(func):
        global _webhook_server
        if _webhook_server is None:
            _webhook_server = WebhookServer(webhook_port, webhook_path)
        _webhook_server.start(func)
        return func

    return decorator

Start a webhook server to receive events from Zoom.

This function creates an HTTP server that listens for webhook events from Zoom. When a webhook event is received, it parses the JSON payload and passes it to the provided callback function.

Can be used as a decorator or a direct function call:

@rtms.onWebhookEvent(port=8080, path='/webhook') def handle_webhook(payload): if payload.get('event') == 'meeting.rtms.started': # Create a client and join client = rtms.Client() client.join(…)

Args

callback : callable, optional
Function to call when a webhook is received
port : int, optional
Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
path : str, optional
URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'

Returns

callable
Decorator function if used as a decorator
def on_webhook_event(callback=None, port=None, path=None)
Expand source code
def onWebhookEvent(callback=None, port=None, path=None):
    """
    Start a webhook server to receive events from Zoom.

    This function creates an HTTP server that listens for webhook events from Zoom.
    When a webhook event is received, it parses the JSON payload and passes it to
    the provided callback function.

    Can be used as a decorator or a direct function call:

    @rtms.onWebhookEvent(port=8080, path='/webhook')
    def handle_webhook(payload):
        if payload.get('event') == 'meeting.rtms.started':
            # Create a client and join
            client = rtms.Client()
            client.join(...)

    Args:
        callback (callable, optional): Function to call when a webhook is received
        port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
        path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'

    Returns:
        callable: Decorator function if used as a decorator
    """
    global _webhook_server

    # Determine port and path
    webhook_port = port or int(os.getenv('ZM_RTMS_PORT', '8080'))
    webhook_path = path or os.getenv('ZM_RTMS_PATH', '/')

    # If used as a decorator without arguments
    if callback is not None and callable(callback):
        if _webhook_server is None:
            _webhook_server = WebhookServer(webhook_port, webhook_path)
        _webhook_server.start(callback)
        return callback

    # If used as a decorator with arguments or as a method call
    def decorator(func):
        global _webhook_server
        if _webhook_server is None:
            _webhook_server = WebhookServer(webhook_port, webhook_path)
        _webhook_server.start(func)
        return func

    return decorator

Start a webhook server to receive events from Zoom.

This function creates an HTTP server that listens for webhook events from Zoom. When a webhook event is received, it parses the JSON payload and passes it to the provided callback function.

Can be used as a decorator or a direct function call:

@rtms.onWebhookEvent(port=8080, path='/webhook') def handle_webhook(payload): if payload.get('event') == 'meeting.rtms.started': # Create a client and join client = rtms.Client() client.join(…)

Args

callback : callable, optional
Function to call when a webhook is received
port : int, optional
Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
path : str, optional
URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'

Returns

callable
Decorator function if used as a decorator
def run(poll_interval: float = 0.01, stop_on_empty: bool = False, executor=None)
Expand source code
def run(poll_interval: float = 0.01, stop_on_empty: bool = False, executor=None):
    """
    Start the default RTMS event loop (blocking).

    Clients that call join() without an explicit EventLoop are automatically
    routed to this default loop. Blocks until interrupted or stop() is called.

    Args:
        poll_interval: Seconds between poll cycles (default: 0.01 = 10ms)
        stop_on_empty: Stop automatically when all clients have left
        executor: Optional concurrent.futures.Executor for dispatching data
            callbacks on all clients that don't have their own executor set.

    Example::

        @rtms.on_webhook_event
        def handle(payload):
            client = rtms.Client()
            client.on_transcript_data(lambda d,s,t,m: print(m.userName, d))
            client.join(payload['payload'])

        rtms.run()   # blocks until Ctrl-C
    """
    global _default_loop, _running, _run_executor
    _run_executor = executor
    _default_loop = EventLoop(poll_interval=poll_interval, name='rtms-default')
    _running = True
    _stop_event.clear()
    try:
        _default_loop.run(stop_on_empty=stop_on_empty)
    finally:
        _running = False
        _default_loop = None
        _run_executor = None

Start the default RTMS event loop (blocking).

Clients that call join() without an explicit EventLoop are automatically routed to this default loop. Blocks until interrupted or stop() is called.

Args

poll_interval
Seconds between poll cycles (default: 0.01 = 10ms)
stop_on_empty
Stop automatically when all clients have left
executor
Optional concurrent.futures.Executor for dispatching data callbacks on all clients that don't have their own executor set.

Example::

@rtms.on_webhook_event
def handle(payload):
    client = rtms.Client()
    client.on_transcript_data(lambda d,s,t,m: print(m.userName, d))
    client.join(payload['payload'])

rtms.run()   # blocks until Ctrl-C
async def run_async(poll_interval: float = 0.01, stop_on_empty: bool = False, executor=None)
Expand source code
async def run_async(poll_interval: float = 0.01, stop_on_empty: bool = False, executor=None):
    """
    Start the default RTMS event loop as an asyncio coroutine.

    Drop-in async replacement for rtms.run(). Yields to the asyncio event loop
    between poll cycles so other coroutines (aiohttp, FastAPI, asyncpg) run freely.

    Args:
        poll_interval: Seconds between poll cycles (default: 0.01 = 10ms)
        stop_on_empty: Stop automatically when all clients have left
        executor: Optional concurrent.futures.Executor for dispatching data
            callbacks on all clients that don't have their own executor set.

    Example::

        async def main():
            await asyncio.gather(rtms.run_async(), aiohttp_app.start())

        asyncio.run(main())
    """
    global _default_loop, _running, _run_executor
    _run_executor = executor
    _default_loop = EventLoop(poll_interval=poll_interval, name='rtms-default')
    _running = True
    _stop_event.clear()
    try:
        await _default_loop.run_async(stop_on_empty=stop_on_empty)
    finally:
        _running = False
        _default_loop = None
        _run_executor = None

Start the default RTMS event loop as an asyncio coroutine.

Drop-in async replacement for rtms.run(). Yields to the asyncio event loop between poll cycles so other coroutines (aiohttp, FastAPI, asyncpg) run freely.

Args

poll_interval
Seconds between poll cycles (default: 0.01 = 10ms)
stop_on_empty
Stop automatically when all clients have left
executor
Optional concurrent.futures.Executor for dispatching data callbacks on all clients that don't have their own executor set.

Example::

async def main():
    await asyncio.gather(rtms.run_async(), aiohttp_app.start())

asyncio.run(main())
def stop()
Expand source code
def stop():
    """
    Stop the default event loop (rtms.run() or rtms.run_async()).

    For EventLoop or EventLoopPool instances, call their .stop() method directly.
    """
    global _running
    _running = False
    _stop_event.set()
    if _default_loop:
        _default_loop.stop()
    log_info('rtms', 'Stop signal received')

Stop the default event loop (rtms.run() or rtms.run_async()).

For EventLoop or EventLoopPool instances, call their .stop() method directly.

def uninitialize()
Expand source code
def uninitialize():
    """
    Uninitialize the RTMS SDK.

    Call this when you're done using the SDK to release resources.
    """
    try:
        _ClientBase.uninitialize()
        Client._sdk_initialized = False
        return True
    except Exception as e:
        log_error("rtms", f"Error uninitializing RTMS SDK: {e}")
        traceback.print_exc()
        return False

Uninitialize the RTMS SDK.

Call this when you're done using the SDK to release resources.

Classes

class AiInterpreter (*args, **kwargs)

Ancestors

  • pybind11_builtins.pybind11_object

Instance variables

prop channelNum
prop lid
prop sampleRate
prop targets
prop timestamp
class AiTargetLanguage (*args, **kwargs)

Ancestors

  • pybind11_builtins.pybind11_object

Instance variables

prop engine
prop lid
prop toneId
prop voiceId
class AudioChannel (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var MONO

The type of the None singleton.

var STEREO

The type of the None singleton.

class AudioCodec (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var G711

The type of the None singleton.

var G722

The type of the None singleton.

var L16

The type of the None singleton.

var OPUS

The type of the None singleton.

var UNDEFINED

The type of the None singleton.

class AudioContentType (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var FILE_STREAM

The type of the None singleton.

var RAW_AUDIO

The type of the None singleton.

var RTP

The type of the None singleton.

var TEXT

The type of the None singleton.

var UNDEFINED

The type of the None singleton.

class AudioParams (...)

init(args, *kwargs) Overloaded function.

  1. init(self: rtms._rtms.AudioParams) -> None

  2. init(self: rtms._rtms.AudioParams, content_type: typing.SupportsInt | typing.SupportsIndex, codec: typing.SupportsInt | typing.SupportsIndex, sample_rate: typing.SupportsInt | typing.SupportsIndex, channel: typing.SupportsInt | typing.SupportsIndex, data_opt: typing.SupportsInt | typing.SupportsIndex, duration: typing.SupportsInt | typing.SupportsIndex, frame_size: typing.SupportsInt | typing.SupportsIndex) -> None

Ancestors

  • pybind11_builtins.pybind11_object

Instance variables

prop channel
prop codec
prop contentType
prop content_type
prop dataOpt
prop data_opt
prop duration
prop frameSize
prop frame_size
prop sampleRate
prop sample_rate
class AudioSampleRate (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var SR_16K

The type of the None singleton.

var SR_32K

The type of the None singleton.

var SR_48K

The type of the None singleton.

var SR_8K

The type of the None singleton.

class Client (executor=None)
Expand source code
class Client(_ClientBase):
    """
    RTMS Client - provides real-time media streaming capabilities

    This client allows you to join Zoom meetings and process audio, video,
    and transcript data in real-time.
    """

    _sdk_initialized = False

    def __init__(self, executor=None):
        """Initialize a new RTMS client.

        Args:
            executor: Optional concurrent.futures.Executor for dispatching data callbacks
                (audio, video, transcript, deskshare) to a thread pool. When set, callbacks
                are submitted via executor.submit() instead of running inline. Pass
                concurrent.futures.ThreadPoolExecutor(n) for CPU-bound or I/O-heavy callbacks.
        """
        # super().__init__() is PyClient() — intentionally a no-op at construction
        # time. The C SDK handle is allocated lazily in _do_alloc_and_join(), which
        # runs on the owning EventLoop's thread. This satisfies the C SDK's thread
        # affinity requirement: alloc/join/poll/release must share one OS thread.
        super().__init__()
        self._polling_interval = 10  # milliseconds
        self._running = False
        self._webhook_server = None
        self._executor = executor  # concurrent.futures.Executor or None
        self._loop = None           # asyncio event loop captured at callback-registration time

        # EventLoop that owns this client's lifecycle (set by loop.add() or auto-created)
        self._assigned_loop: Optional['EventLoop'] = None
        # Pending join params — stored until the loop's thread calls _do_alloc_and_join()
        self._pending_join_params: Optional[dict] = None

        # Individual video subscription callbacks
        self._participant_video_callback = None
        self._video_subscribed_callback = None

        # Shared event dispatcher state (matches Node.js setupEventHandler pattern)
        self._event_handler_registered = False
        self._participant_event_callback = None
        self._active_speaker_callback = None
        self._sharing_callback = None
        self._media_interrupted_callback = None
        self._raw_event_callback = None

        # Register with global client registry
        with _clients_lock:
            _clients[id(self)] = self

    def join(self,
             meeting_uuid: str = None,
             webinar_uuid: str = None,
             session_id: str = None,
             engagement_id: str = None,
             rtms_stream_id: str = None,
             server_urls: str = None,
             signature: str = None,
             timeout: int = -1,
             ca: str = None,
             client: str = None,
             secret: str = None,
             poll_interval: int = 10,
             **kwargs):
        """
        Join a RTMS session.

        Can be called with positional arguments or with a dictionary of parameters.

        Args:
            meeting_uuid (str): Meeting UUID (for Meeting SDK events)
            webinar_uuid (str): Webinar UUID (for Webinar events)
            session_id (str): Session ID (for Video SDK events) - used when meeting_uuid is not provided
            engagement_id (str): Engagement ID (for ZCC events) - used when meeting_uuid is not provided
            rtms_stream_id (str): RTMS stream ID
            server_urls (str): Server URLs (comma-separated)
            signature (str, optional): Authentication signature. If not provided, will be generated
            timeout (int, optional): Timeout in milliseconds. Defaults to -1 (no timeout).
            ca (str, optional): CA certificate path. Defaults to system CA files.
            client (str, optional): Client ID. If empty, uses ZM_RTMS_CLIENT env var.
            secret (str, optional): Client secret. If empty, uses ZM_RTMS_SECRET env var.
            poll_interval (int, optional): Polling interval in milliseconds. Defaults to 10.
            **kwargs: Additional arguments passed to join

        Returns:
            bool: True if joined successfully, False otherwise
        """

        # Normalise all call forms into a single params dict
        params = {
            'meeting_uuid': meeting_uuid,
            'webinar_uuid': webinar_uuid,
            'session_id': session_id,
            'engagement_id': engagement_id,
            'rtms_stream_id': rtms_stream_id,
            'server_urls': server_urls,
            'signature': signature,
            'timeout': timeout,
            'ca': ca,
            'client': client,
            'secret': secret,
            'poll_interval': poll_interval,
        }
        if kwargs:
            params.update(kwargs)
        if isinstance(meeting_uuid, dict):
            params = dict(meeting_uuid)

        # Store params for the loop thread to consume
        self._pending_join_params = params

        # If the client has been assigned to an explicit EventLoop, it will be
        # picked up by that loop's _drain_pending() call — nothing else to do.
        if self._assigned_loop is not None:
            log_debug("client", "join() deferred to assigned EventLoop thread")
            return True

        # If rtms.run() / rtms.run_async() is active, route to the default loop.
        if _default_loop is not None:
            log_debug("client", "join() routed to default EventLoop")
            _default_loop.add(self)
            return True

        # Zero-config (Tier 0): no loop assigned and no run() active —
        # create an implicit single-client EventLoop as a background daemon thread.
        log_debug("client", "No EventLoop assigned — creating implicit single-client loop")
        implicit_loop = EventLoop(
            poll_interval=params.get('poll_interval', 10) / 1000.0,
            name='rtms-implicit',
        )
        implicit_loop.add(self)
        implicit_loop.start()
        return True

    def _do_alloc_and_join(self) -> None:
        """
        Called by EventLoop._drain_pending() on the loop's own thread.

        Performs the two operations that must share an OS thread:
          1. alloc()  — creates the C SDK handle (rtms_alloc)
          2. join()   — registers callbacks and connects (rtms_set_callbacks + rtms_join)
        """
        params = self._pending_join_params
        if params is None:
            raise RuntimeError("_do_alloc_and_join called with no pending join params")

        try:
            meeting_uuid  = params.get('meeting_uuid')
            webinar_uuid  = params.get('webinar_uuid')
            session_id    = params.get('session_id')
            engagement_id = params.get('engagement_id')
            rtms_stream_id = params.get('rtms_stream_id')
            server_urls   = params.get('server_urls')
            signature     = params.get('signature')
            timeout       = params.get('timeout', -1)
            client_id     = params.get('client', os.getenv('ZM_RTMS_CLIENT'))
            secret        = params.get('secret', os.getenv('ZM_RTMS_SECRET'))
            poll_interval = params.get('poll_interval', 10)

            instance_id = meeting_uuid or webinar_uuid or session_id or engagement_id
            if not instance_id:
                raise ValueError("meeting_uuid, webinar_uuid, session_id, or engagement_id is required")
            if not rtms_stream_id:
                raise ValueError("rtms_stream_id is required")
            if not server_urls:
                raise ValueError("server_urls is required")

            if not signature:
                signature = generate_signature(client_id, secret, instance_id, rtms_stream_id)

            self._polling_interval = poll_interval

            # Phase 1: ensure SDK is initialized on THIS thread (same thread as alloc/join).
            # The lock ensures init() runs exactly once even if multiple EventLoops start
            # simultaneously, but the actual C call only happens on the first client's thread.
            with _sdk_init_lock:
                if not Client._sdk_initialized:
                    ca_path = find_ca_certificate()
                    log_debug("client", f"Initializing SDK with CA: {ca_path}")
                    try:
                        _ClientBase.initialize(ca_path, 1, "python-rtms")
                    except Exception:
                        log_debug("client", "Trying SDK initialization with empty CA path")
                        _ClientBase.initialize("", 1, "python-rtms")
                    Client._sdk_initialized = True
                    log_debug("client", "SDK initialized successfully")

            # Phase 2: allocate C SDK handle on this thread
            super().alloc()

            session_type = 'meeting' if meeting_uuid else 'webinar' if webinar_uuid else 'engagement' if engagement_id else 'session'
            log_info("client", f"Joining {session_type}: {instance_id}")

            # join() on the same thread as alloc() — C SDK constraint satisfied
            super().join(instance_id, rtms_stream_id, signature, server_urls, timeout)

            self._running = True
            log_info("client", "Successfully joined")

        except Exception as e:
            log_error("client", f"Error in _do_alloc_and_join: {e}")
            traceback.print_exc()
            raise

    def _initialize_rtms(self, ca_path=None):
        """Initialize the RTMS SDK with the best available CA certificate"""
        try:
            # Find the best CA certificate
            ca_path = find_ca_certificate(ca_path)

            # Initialize the SDK
            log_debug("client", f"Initializing RTMS with CA: {ca_path}")
            _ClientBase.initialize(ca_path)
            return True
        except Exception as e:
            log_error("client", f"Error initializing RTMS: {e}")
            traceback.print_exc()
            # Try with an empty path as a last resort
            try:
                log_debug("client", "Trying initialization with empty CA path")
                _ClientBase.initialize("")
                return True
            except Exception as e2:
                log_error("client", f"Failed to initialize with empty CA path: {e2}")
                raise e  # Raise the original error

    def poll(self):
        """Poll the C SDK for pending events. Called by the owning EventLoop's thread."""
        if self._running:
            try:
                super().poll()
            except Exception as e:
                log_error("client", f"Error during polling: {e}")

    # ========================================================================
    # Callback Dispatch
    # ========================================================================

    def _wrap_callback(self, callback):
        """Wrap a callback for executor or asyncio dispatch.

        - sync + no executor  → returned unchanged (v1.0 inline behavior)
        - sync + executor     → submitted to executor.submit() on each call
        - async coroutine     → scheduled on the captured asyncio event loop
        """
        if callback is None:
            return None
        if inspect.iscoroutinefunction(callback):
            # Capture the running loop now (at registration time) if one exists.
            try:
                loop = asyncio.get_running_loop()
            except RuntimeError:
                loop = None
            self._loop = loop
            def async_wrapper(*args):
                _loop = self._loop
                if _loop and _loop.is_running():
                    asyncio.run_coroutine_threadsafe(callback(*args), _loop)
                else:
                    try:
                        asyncio.run(callback(*args))
                    except RuntimeError:
                        pass
            return async_wrapper
        executor = self._executor or _run_executor
        if executor is not None:
            def executor_wrapper(*args):
                executor.submit(callback, *args)
            return executor_wrapper
        return callback

    # ========================================================================
    # Data Callbacks (Python-level so _wrap_callback applies and aliases work)
    # ========================================================================

    def on_audio_data(self, callback) -> None:
        """Register audio data callback. Supports executor and async coroutines."""
        super().on_audio_data(self._wrap_callback(callback))

    onAudioData = on_audio_data

    def on_video_data(self, callback) -> None:
        """Register video data callback. Supports executor and async coroutines."""
        super().on_video_data(self._wrap_callback(callback))

    onVideoData = on_video_data

    def on_deskshare_data(self, callback) -> None:
        """Register deskshare data callback. Supports executor and async coroutines."""
        super().on_deskshare_data(self._wrap_callback(callback))

    onDeskshareData = on_deskshare_data

    def on_transcript_data(self, callback) -> None:
        """Register transcript data callback. Supports executor and async coroutines."""
        super().on_transcript_data(self._wrap_callback(callback))

    onTranscriptData = on_transcript_data

    # ========================================================================
    # Context Manager
    # ========================================================================

    def __enter__(self):
        """Support `with rtms.Client() as client:` usage."""
        return self

    def __exit__(self, *_):
        """Call leave() on context exit. Exceptions are not suppressed."""
        self.leave()
        return False

    def stop(self):
        """
        Stop RTMS client and release resources.

        This is equivalent to calling leave(), but with a more intuitive name.
        """
        return self.leave()

    def set_audio_params(self, params):
        """
        Set audio parameters with validation.

        Args:
            params (AudioParams): Audio parameters object

        Returns:
            bool: True if parameters were set successfully

        Raises:
            ValueError: If parameters are invalid
        """
        _validate_audio_params(params)
        return super().set_audio_params(params)

    # camelCase legacy alias
    setAudioParams = set_audio_params

    def set_video_params(self, params):
        """
        Set video parameters with validation.

        Args:
            params (VideoParams): Video parameters object

        Returns:
            bool: True if parameters were set successfully

        Raises:
            ValueError: If parameters are invalid
        """
        _validate_video_params(params)
        return super().set_video_params(params)

    # camelCase legacy alias
    setVideoParams = set_video_params

    def set_deskshare_params(self, params):
        """
        Set deskshare parameters with validation.

        Args:
            params (DeskshareParams): Deskshare parameters object

        Returns:
            bool: True if parameters were set successfully

        Raises:
            ValueError: If parameters are invalid
        """
        _validate_deskshare_params(params)
        return super().set_deskshare_params(params)

    # camelCase legacy alias
    setDeskshareParams = set_deskshare_params

    def set_transcript_params(self, params):
        """
        Set transcript parameters.

        Args:
            params (TranscriptParams): Transcript parameters object

        Returns:
            bool: True if parameters were set successfully
        """
        return super().set_transcript_params(params)

    # camelCase legacy alias
    setTranscriptParams = set_transcript_params

    def set_proxy(self, proxy_type: str, proxy_url: str) -> None:
        """Configure a proxy for SDK connections.

        Args:
            proxy_type (str): Proxy protocol type (e.g. 'http', 'https').
            proxy_url (str): Full proxy URL including host and port.
        """
        return super().set_proxy(proxy_type, proxy_url)

    # camelCase legacy alias
    setProxy = set_proxy

    def subscribe_video(self, user_id: int, subscribe: bool) -> None:
        """Subscribe or unsubscribe from an individual participant's video stream.

        Args:
            user_id (int): The participant's user ID.
            subscribe (bool): True to subscribe, False to unsubscribe.
        """
        return super().subscribe_video(user_id, subscribe)

    subscribeVideo = subscribe_video

    def on_participant_video(self, callback) -> None:
        """Register a callback for participant video state changes.

        The callback receives (users: list[int], is_on: bool).
        """
        self._participant_video_callback = callback
        super().on_participant_video(callback)

    onParticipantVideo = on_participant_video

    def on_video_subscribed(self, callback) -> None:
        """Register a callback for video subscription responses.

        The callback receives (user_id: int, status: int, error: str).
        """
        self._video_subscribed_callback = callback
        super().on_video_subscribed(callback)

    onVideoSubscribed = on_video_subscribed

    def subscribe_event(self, events):
        """
        Subscribe to receive specific event types.

        Note: Calling on_participant_event() automatically subscribes to
        EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE events.

        Args:
            events: List of event type constants (e.g., [EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE])

        Returns:
            bool: True if subscription was successful
        """
        return super().subscribe_event(events)

    # camelCase legacy alias
    subscribeEvent = subscribe_event

    def unsubscribe_event(self, events):
        """
        Unsubscribe from specific event types.

        Args:
            events: List of event type constants

        Returns:
            bool: True if unsubscription was successful
        """
        return super().unsubscribe_event(events)

    # camelCase legacy alias
    unsubscribeEvent = unsubscribe_event

    def _setup_event_handler(self):
        """
        Internal shared event dispatcher that routes events to typed callbacks.
        Matches the Node.js setupEventHandler() pattern. Only registers once.
        """
        if self._event_handler_registered:
            return
        self._event_handler_registered = True

        def event_dispatcher(event_data: str):
            if self._raw_event_callback:
                self._raw_event_callback(event_data)
            try:
                data = json.loads(event_data)
                event_type = data.get('event_type')

                if event_type == EVENT_PARTICIPANT_JOIN:
                    if self._participant_event_callback:
                        participants = [
                            {'user_id': p.get('user_id'), 'user_name': p.get('user_name')}
                            for p in data.get('participants', [])
                        ]
                        self._participant_event_callback('join', data.get('timestamp', 0), participants)

                elif event_type == EVENT_PARTICIPANT_LEAVE:
                    if self._participant_event_callback:
                        participants = [
                            {'user_id': p.get('user_id'), 'user_name': p.get('user_name')}
                            for p in data.get('participants', [])
                        ]
                        self._participant_event_callback('leave', data.get('timestamp', 0), participants)

                elif event_type == EVENT_ACTIVE_SPEAKER_CHANGE:
                    if self._active_speaker_callback:
                        self._active_speaker_callback(
                            data.get('timestamp', 0),
                            data.get('user_id', 0),
                            data.get('user_name', '')
                        )

                elif event_type == EVENT_SHARING_START:
                    if self._sharing_callback:
                        self._sharing_callback(
                            'start',
                            data.get('timestamp', 0),
                            data.get('user_id'),
                            data.get('user_name')
                        )

                elif event_type == EVENT_SHARING_STOP:
                    if self._sharing_callback:
                        self._sharing_callback(
                            'stop',
                            data.get('timestamp', 0),
                            None,
                            None
                        )

                elif event_type == EVENT_MEDIA_CONNECTION_INTERRUPTED:
                    if self._media_interrupted_callback:
                        self._media_interrupted_callback(data.get('timestamp', 0))

            except Exception as e:
                log_error('client', f'Failed to parse event: {e}')

        super().on_event_ex(event_dispatcher)

    def on_participant_event(self, callback: Callable[[str, int, list], None]) -> bool:
        """
        Register a callback for participant join/leave events.

        This automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE.
        Events are delivered as parsed objects, not raw JSON.

        Args:
            callback: Function called with (event, timestamp, participants) where:
                - event: 'join' or 'leave'
                - timestamp: Unix timestamp in milliseconds
                - participants: List of dicts with 'user_id' and optional 'user_name'

        Returns:
            bool: True if registration succeeds

        Example:
            >>> def on_participant(event, timestamp, participants):
            ...     print(f"Participant {event}: {participants}")
            >>> client.on_participant_event(on_participant)
        """
        self._participant_event_callback = callback
        self._setup_event_handler()
        try:
            self.subscribe_event([EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE])
        except Exception as e:
            log_warn('client', f'Failed to auto-subscribe to participant events: {e}')
        return True

    # camelCase legacy alias
    onParticipantEvent = on_participant_event

    def on_active_speaker_event(self, callback: Callable[[int, int, str], None]) -> bool:
        """
        Register a callback for active speaker change events.

        This automatically subscribes to EVENT_ACTIVE_SPEAKER_CHANGE.

        Args:
            callback: Function called with (timestamp, user_id, user_name)

        Returns:
            bool: True if registration succeeds

        Example:
            >>> def on_speaker(timestamp, user_id, user_name):
            ...     print(f"Active speaker: {user_name} ({user_id})")
            >>> client.on_active_speaker_event(on_speaker)
        """
        self._active_speaker_callback = callback
        self._setup_event_handler()
        try:
            self.subscribe_event([EVENT_ACTIVE_SPEAKER_CHANGE])
        except Exception as e:
            log_warn('client', f'Failed to auto-subscribe to active speaker events: {e}')
        return True

    # camelCase legacy alias
    onActiveSpeakerEvent = on_active_speaker_event

    def on_sharing_event(self, callback: Callable[[str, int, Optional[int], Optional[str]], None]) -> bool:
        """
        Register a callback for sharing start/stop events.

        This automatically subscribes to EVENT_SHARING_START and EVENT_SHARING_STOP.
        Note: These events only work when the RTMS app has DESKSHARE scope permission.

        Args:
            callback: Function called with (event, timestamp, user_id, user_name) where:
                - event: 'start' or 'stop'
                - timestamp: Unix timestamp in milliseconds
                - user_id: Optional user ID (only for 'start')
                - user_name: Optional user name (only for 'start')

        Returns:
            bool: True if registration succeeds

        Example:
            >>> def on_sharing(event, timestamp, user_id, user_name):
            ...     print(f"Sharing {event} by {user_name}")
            >>> client.on_sharing_event(on_sharing)
        """
        self._sharing_callback = callback
        self._setup_event_handler()
        try:
            self.subscribe_event([EVENT_SHARING_START, EVENT_SHARING_STOP])
        except Exception as e:
            log_warn('client', f'Failed to auto-subscribe to sharing events: {e}')
        return True

    # camelCase legacy alias
    onSharingEvent = on_sharing_event

    def on_media_connection_interrupted(self, callback: Callable[[int], None]) -> bool:
        """
        Register a callback for media connection interrupted events.

        This automatically subscribes to EVENT_MEDIA_CONNECTION_INTERRUPTED.

        Args:
            callback: Function called with (timestamp,) when the media connection is interrupted

        Returns:
            bool: True if registration succeeds

        Example:
            >>> def on_interrupted(timestamp):
            ...     print(f"Media connection interrupted at {timestamp}")
            >>> client.on_media_connection_interrupted(on_interrupted)
        """
        self._media_interrupted_callback = callback
        self._setup_event_handler()
        try:
            self.subscribe_event([EVENT_MEDIA_CONNECTION_INTERRUPTED])
        except Exception as e:
            log_warn('client', f'Failed to auto-subscribe to media connection interrupted events: {e}')
        return True

    # camelCase legacy alias
    onMediaConnectionInterrupted = on_media_connection_interrupted

    def on_event_ex(self, callback: Callable[[str], None]) -> bool:
        """
        Register a callback for raw event data.

        This provides access to the raw JSON event data from the SDK.
        Use this when you need custom event handling or access to all event types.
        This callback is called IN ADDITION to typed callbacks, not instead of.

        Args:
            callback: Function called with raw JSON event data string

        Returns:
            bool: True if registration succeeds
        """
        self._raw_event_callback = callback
        self._setup_event_handler()
        return True

    # camelCase legacy alias
    onEventEx = on_event_ex

    def leave(self):
        """
        Leave the RTMS session.

        Signals the owning EventLoop to stop polling this client and releases
        the C SDK handle. Safe to call from any thread.

        Returns:
            bool: True if left successfully
        """
        log_info("client", "Leaving RTMS session")

        # Signal the EventLoop to stop polling this client
        self._running = False

        # Unregister from global client registry
        with _clients_lock:
            _clients.pop(id(self), None)

        # Stop webhook server if we have one
        if self._webhook_server:
            self._webhook_server.stop()
            self._webhook_server = None

        try:
            # Release RTMS resources
            super().release()
            return True
        except Exception as e:
            log_error("client", f"Error releasing RTMS resources: {e}")
            traceback.print_exc()
            return False

    def on_webhook_event(self, callback=None, port=None, path=None):
        """
        Register a webhook event handler.

        This can be used as a decorator or a direct method call:

        @client.on_webhook_event(port=8080, path='/webhook')
        def handle_webhook(payload):
            print(f"Received webhook: {payload}")

        Args:
            callback (callable, optional): Function to call when a webhook is received
            port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
            path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'

        Returns:
            callable: Decorator function if used as a decorator
        """
        # If used as a decorator without arguments
        if callback is not None and callable(callback):
            # Start webhook server with provided callback
            self._start_webhook_server(callback)
            return callback

        # If used as a decorator with arguments or as a method call
        def decorator(func):
            self._start_webhook_server(func, port, path)
            return func

        return decorator

    def _start_webhook_server(self, callback, port=None, path=None):
        """
        Start the webhook server

        Args:
            callback (callable): Function to call when a webhook is received
            port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
            path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'
        """
        if not self._webhook_server:
            port = port or int(os.getenv('ZM_RTMS_PORT', '8080'))
            path = path or os.getenv('ZM_RTMS_PATH', '/')

            self._webhook_server = WebhookServer(port, path)

        self._webhook_server.start(callback)

RTMS Client - provides real-time media streaming capabilities

This client allows you to join Zoom meetings and process audio, video, and transcript data in real-time.

Initialize a new RTMS client.

Args

executor
Optional concurrent.futures.Executor for dispatching data callbacks (audio, video, transcript, deskshare) to a thread pool. When set, callbacks are submitted via executor.submit() instead of running inline. Pass concurrent.futures.ThreadPoolExecutor(n) for CPU-bound or I/O-heavy callbacks.

Ancestors

  • rtms._rtms.Client
  • pybind11_builtins.pybind11_object

Methods

def join(self,
meeting_uuid: str = None,
webinar_uuid: str = None,
session_id: str = None,
engagement_id: str = None,
rtms_stream_id: str = None,
server_urls: str = None,
signature: str = None,
timeout: int = -1,
ca: str = None,
client: str = None,
secret: str = None,
poll_interval: int = 10,
**kwargs)
Expand source code
def join(self,
         meeting_uuid: str = None,
         webinar_uuid: str = None,
         session_id: str = None,
         engagement_id: str = None,
         rtms_stream_id: str = None,
         server_urls: str = None,
         signature: str = None,
         timeout: int = -1,
         ca: str = None,
         client: str = None,
         secret: str = None,
         poll_interval: int = 10,
         **kwargs):
    """
    Join a RTMS session.

    Can be called with positional arguments or with a dictionary of parameters.

    Args:
        meeting_uuid (str): Meeting UUID (for Meeting SDK events)
        webinar_uuid (str): Webinar UUID (for Webinar events)
        session_id (str): Session ID (for Video SDK events) - used when meeting_uuid is not provided
        engagement_id (str): Engagement ID (for ZCC events) - used when meeting_uuid is not provided
        rtms_stream_id (str): RTMS stream ID
        server_urls (str): Server URLs (comma-separated)
        signature (str, optional): Authentication signature. If not provided, will be generated
        timeout (int, optional): Timeout in milliseconds. Defaults to -1 (no timeout).
        ca (str, optional): CA certificate path. Defaults to system CA files.
        client (str, optional): Client ID. If empty, uses ZM_RTMS_CLIENT env var.
        secret (str, optional): Client secret. If empty, uses ZM_RTMS_SECRET env var.
        poll_interval (int, optional): Polling interval in milliseconds. Defaults to 10.
        **kwargs: Additional arguments passed to join

    Returns:
        bool: True if joined successfully, False otherwise
    """

    # Normalise all call forms into a single params dict
    params = {
        'meeting_uuid': meeting_uuid,
        'webinar_uuid': webinar_uuid,
        'session_id': session_id,
        'engagement_id': engagement_id,
        'rtms_stream_id': rtms_stream_id,
        'server_urls': server_urls,
        'signature': signature,
        'timeout': timeout,
        'ca': ca,
        'client': client,
        'secret': secret,
        'poll_interval': poll_interval,
    }
    if kwargs:
        params.update(kwargs)
    if isinstance(meeting_uuid, dict):
        params = dict(meeting_uuid)

    # Store params for the loop thread to consume
    self._pending_join_params = params

    # If the client has been assigned to an explicit EventLoop, it will be
    # picked up by that loop's _drain_pending() call — nothing else to do.
    if self._assigned_loop is not None:
        log_debug("client", "join() deferred to assigned EventLoop thread")
        return True

    # If rtms.run() / rtms.run_async() is active, route to the default loop.
    if _default_loop is not None:
        log_debug("client", "join() routed to default EventLoop")
        _default_loop.add(self)
        return True

    # Zero-config (Tier 0): no loop assigned and no run() active —
    # create an implicit single-client EventLoop as a background daemon thread.
    log_debug("client", "No EventLoop assigned — creating implicit single-client loop")
    implicit_loop = EventLoop(
        poll_interval=params.get('poll_interval', 10) / 1000.0,
        name='rtms-implicit',
    )
    implicit_loop.add(self)
    implicit_loop.start()
    return True

Join a RTMS session.

Can be called with positional arguments or with a dictionary of parameters.

Args

meeting_uuid : str
Meeting UUID (for Meeting SDK events)
webinar_uuid : str
Webinar UUID (for Webinar events)
session_id : str
Session ID (for Video SDK events) - used when meeting_uuid is not provided
engagement_id : str
Engagement ID (for ZCC events) - used when meeting_uuid is not provided
rtms_stream_id : str
RTMS stream ID
server_urls : str
Server URLs (comma-separated)
signature : str, optional
Authentication signature. If not provided, will be generated
timeout : int, optional
Timeout in milliseconds. Defaults to -1 (no timeout).
ca : str, optional
CA certificate path. Defaults to system CA files.
client : str, optional
Client ID. If empty, uses ZM_RTMS_CLIENT env var.
secret : str, optional
Client secret. If empty, uses ZM_RTMS_SECRET env var.
poll_interval : int, optional
Polling interval in milliseconds. Defaults to 10.
**kwargs
Additional arguments passed to join

Returns

bool
True if joined successfully, False otherwise
def leave(self)
Expand source code
def leave(self):
    """
    Leave the RTMS session.

    Signals the owning EventLoop to stop polling this client and releases
    the C SDK handle. Safe to call from any thread.

    Returns:
        bool: True if left successfully
    """
    log_info("client", "Leaving RTMS session")

    # Signal the EventLoop to stop polling this client
    self._running = False

    # Unregister from global client registry
    with _clients_lock:
        _clients.pop(id(self), None)

    # Stop webhook server if we have one
    if self._webhook_server:
        self._webhook_server.stop()
        self._webhook_server = None

    try:
        # Release RTMS resources
        super().release()
        return True
    except Exception as e:
        log_error("client", f"Error releasing RTMS resources: {e}")
        traceback.print_exc()
        return False

Leave the RTMS session.

Signals the owning EventLoop to stop polling this client and releases the C SDK handle. Safe to call from any thread.

Returns

bool
True if left successfully
def onActiveSpeakerEvent(self, callback: Callable[[int, int, str], None]) ‑> bool
Expand source code
def on_active_speaker_event(self, callback: Callable[[int, int, str], None]) -> bool:
    """
    Register a callback for active speaker change events.

    This automatically subscribes to EVENT_ACTIVE_SPEAKER_CHANGE.

    Args:
        callback: Function called with (timestamp, user_id, user_name)

    Returns:
        bool: True if registration succeeds

    Example:
        >>> def on_speaker(timestamp, user_id, user_name):
        ...     print(f"Active speaker: {user_name} ({user_id})")
        >>> client.on_active_speaker_event(on_speaker)
    """
    self._active_speaker_callback = callback
    self._setup_event_handler()
    try:
        self.subscribe_event([EVENT_ACTIVE_SPEAKER_CHANGE])
    except Exception as e:
        log_warn('client', f'Failed to auto-subscribe to active speaker events: {e}')
    return True

Register a callback for active speaker change events.

This automatically subscribes to EVENT_ACTIVE_SPEAKER_CHANGE.

Args

callback
Function called with (timestamp, user_id, user_name)

Returns

bool
True if registration succeeds

Example

>>> def on_speaker(timestamp, user_id, user_name):
...     print(f"Active speaker: {user_name} ({user_id})")
>>> client.on_active_speaker_event(on_speaker)
def onAudioData(self, callback) ‑> None
Expand source code
def on_audio_data(self, callback) -> None:
    """Register audio data callback. Supports executor and async coroutines."""
    super().on_audio_data(self._wrap_callback(callback))

Register audio data callback. Supports executor and async coroutines.

def onDeskshareData(self, callback) ‑> None
Expand source code
def on_deskshare_data(self, callback) -> None:
    """Register deskshare data callback. Supports executor and async coroutines."""
    super().on_deskshare_data(self._wrap_callback(callback))

Register deskshare data callback. Supports executor and async coroutines.

def onEventEx(self, callback: Callable[[str], None]) ‑> bool
Expand source code
def on_event_ex(self, callback: Callable[[str], None]) -> bool:
    """
    Register a callback for raw event data.

    This provides access to the raw JSON event data from the SDK.
    Use this when you need custom event handling or access to all event types.
    This callback is called IN ADDITION to typed callbacks, not instead of.

    Args:
        callback: Function called with raw JSON event data string

    Returns:
        bool: True if registration succeeds
    """
    self._raw_event_callback = callback
    self._setup_event_handler()
    return True

Register a callback for raw event data.

This provides access to the raw JSON event data from the SDK. Use this when you need custom event handling or access to all event types. This callback is called IN ADDITION to typed callbacks, not instead of.

Args

callback
Function called with raw JSON event data string

Returns

bool
True if registration succeeds
def onMediaConnectionInterrupted(self, callback: Callable[[int], None]) ‑> bool
Expand source code
def on_media_connection_interrupted(self, callback: Callable[[int], None]) -> bool:
    """
    Register a callback for media connection interrupted events.

    This automatically subscribes to EVENT_MEDIA_CONNECTION_INTERRUPTED.

    Args:
        callback: Function called with (timestamp,) when the media connection is interrupted

    Returns:
        bool: True if registration succeeds

    Example:
        >>> def on_interrupted(timestamp):
        ...     print(f"Media connection interrupted at {timestamp}")
        >>> client.on_media_connection_interrupted(on_interrupted)
    """
    self._media_interrupted_callback = callback
    self._setup_event_handler()
    try:
        self.subscribe_event([EVENT_MEDIA_CONNECTION_INTERRUPTED])
    except Exception as e:
        log_warn('client', f'Failed to auto-subscribe to media connection interrupted events: {e}')
    return True

Register a callback for media connection interrupted events.

This automatically subscribes to EVENT_MEDIA_CONNECTION_INTERRUPTED.

Args

callback
Function called with (timestamp,) when the media connection is interrupted

Returns

bool
True if registration succeeds

Example

>>> def on_interrupted(timestamp):
...     print(f"Media connection interrupted at {timestamp}")
>>> client.on_media_connection_interrupted(on_interrupted)
def onParticipantEvent(self, callback: Callable[[str, int, list], None]) ‑> bool
Expand source code
def on_participant_event(self, callback: Callable[[str, int, list], None]) -> bool:
    """
    Register a callback for participant join/leave events.

    This automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE.
    Events are delivered as parsed objects, not raw JSON.

    Args:
        callback: Function called with (event, timestamp, participants) where:
            - event: 'join' or 'leave'
            - timestamp: Unix timestamp in milliseconds
            - participants: List of dicts with 'user_id' and optional 'user_name'

    Returns:
        bool: True if registration succeeds

    Example:
        >>> def on_participant(event, timestamp, participants):
        ...     print(f"Participant {event}: {participants}")
        >>> client.on_participant_event(on_participant)
    """
    self._participant_event_callback = callback
    self._setup_event_handler()
    try:
        self.subscribe_event([EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE])
    except Exception as e:
        log_warn('client', f'Failed to auto-subscribe to participant events: {e}')
    return True

Register a callback for participant join/leave events.

This automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE. Events are delivered as parsed objects, not raw JSON.

Args

callback
Function called with (event, timestamp, participants) where: - event: 'join' or 'leave' - timestamp: Unix timestamp in milliseconds - participants: List of dicts with 'user_id' and optional 'user_name'

Returns

bool
True if registration succeeds

Example

>>> def on_participant(event, timestamp, participants):
...     print(f"Participant {event}: {participants}")
>>> client.on_participant_event(on_participant)
def onParticipantVideo(self, callback) ‑> None
Expand source code
def on_participant_video(self, callback) -> None:
    """Register a callback for participant video state changes.

    The callback receives (users: list[int], is_on: bool).
    """
    self._participant_video_callback = callback
    super().on_participant_video(callback)

Register a callback for participant video state changes.

The callback receives (users: list[int], is_on: bool).

def onSharingEvent(self, callback: Callable[[str, int, int | None, str | None], None]) ‑> bool
Expand source code
def on_sharing_event(self, callback: Callable[[str, int, Optional[int], Optional[str]], None]) -> bool:
    """
    Register a callback for sharing start/stop events.

    This automatically subscribes to EVENT_SHARING_START and EVENT_SHARING_STOP.
    Note: These events only work when the RTMS app has DESKSHARE scope permission.

    Args:
        callback: Function called with (event, timestamp, user_id, user_name) where:
            - event: 'start' or 'stop'
            - timestamp: Unix timestamp in milliseconds
            - user_id: Optional user ID (only for 'start')
            - user_name: Optional user name (only for 'start')

    Returns:
        bool: True if registration succeeds

    Example:
        >>> def on_sharing(event, timestamp, user_id, user_name):
        ...     print(f"Sharing {event} by {user_name}")
        >>> client.on_sharing_event(on_sharing)
    """
    self._sharing_callback = callback
    self._setup_event_handler()
    try:
        self.subscribe_event([EVENT_SHARING_START, EVENT_SHARING_STOP])
    except Exception as e:
        log_warn('client', f'Failed to auto-subscribe to sharing events: {e}')
    return True

Register a callback for sharing start/stop events.

This automatically subscribes to EVENT_SHARING_START and EVENT_SHARING_STOP. Note: These events only work when the RTMS app has DESKSHARE scope permission.

Args

callback
Function called with (event, timestamp, user_id, user_name) where: - event: 'start' or 'stop' - timestamp: Unix timestamp in milliseconds - user_id: Optional user ID (only for 'start') - user_name: Optional user name (only for 'start')

Returns

bool
True if registration succeeds

Example

>>> def on_sharing(event, timestamp, user_id, user_name):
...     print(f"Sharing {event} by {user_name}")
>>> client.on_sharing_event(on_sharing)
def onTranscriptData(self, callback) ‑> None
Expand source code
def on_transcript_data(self, callback) -> None:
    """Register transcript data callback. Supports executor and async coroutines."""
    super().on_transcript_data(self._wrap_callback(callback))

Register transcript data callback. Supports executor and async coroutines.

def onVideoData(self, callback) ‑> None
Expand source code
def on_video_data(self, callback) -> None:
    """Register video data callback. Supports executor and async coroutines."""
    super().on_video_data(self._wrap_callback(callback))

Register video data callback. Supports executor and async coroutines.

def onVideoSubscribed(self, callback) ‑> None
Expand source code
def on_video_subscribed(self, callback) -> None:
    """Register a callback for video subscription responses.

    The callback receives (user_id: int, status: int, error: str).
    """
    self._video_subscribed_callback = callback
    super().on_video_subscribed(callback)

Register a callback for video subscription responses.

The callback receives (user_id: int, status: int, error: str).

def on_active_speaker_event(self, callback: Callable[[int, int, str], None]) ‑> bool
Expand source code
def on_active_speaker_event(self, callback: Callable[[int, int, str], None]) -> bool:
    """
    Register a callback for active speaker change events.

    This automatically subscribes to EVENT_ACTIVE_SPEAKER_CHANGE.

    Args:
        callback: Function called with (timestamp, user_id, user_name)

    Returns:
        bool: True if registration succeeds

    Example:
        >>> def on_speaker(timestamp, user_id, user_name):
        ...     print(f"Active speaker: {user_name} ({user_id})")
        >>> client.on_active_speaker_event(on_speaker)
    """
    self._active_speaker_callback = callback
    self._setup_event_handler()
    try:
        self.subscribe_event([EVENT_ACTIVE_SPEAKER_CHANGE])
    except Exception as e:
        log_warn('client', f'Failed to auto-subscribe to active speaker events: {e}')
    return True

Register a callback for active speaker change events.

This automatically subscribes to EVENT_ACTIVE_SPEAKER_CHANGE.

Args

callback
Function called with (timestamp, user_id, user_name)

Returns

bool
True if registration succeeds

Example

>>> def on_speaker(timestamp, user_id, user_name):
...     print(f"Active speaker: {user_name} ({user_id})")
>>> client.on_active_speaker_event(on_speaker)
def on_audio_data(self, callback) ‑> None
Expand source code
def on_audio_data(self, callback) -> None:
    """Register audio data callback. Supports executor and async coroutines."""
    super().on_audio_data(self._wrap_callback(callback))

Register audio data callback. Supports executor and async coroutines.

def on_deskshare_data(self, callback) ‑> None
Expand source code
def on_deskshare_data(self, callback) -> None:
    """Register deskshare data callback. Supports executor and async coroutines."""
    super().on_deskshare_data(self._wrap_callback(callback))

Register deskshare data callback. Supports executor and async coroutines.

def on_event_ex(self, callback: Callable[[str], None]) ‑> bool
Expand source code
def on_event_ex(self, callback: Callable[[str], None]) -> bool:
    """
    Register a callback for raw event data.

    This provides access to the raw JSON event data from the SDK.
    Use this when you need custom event handling or access to all event types.
    This callback is called IN ADDITION to typed callbacks, not instead of.

    Args:
        callback: Function called with raw JSON event data string

    Returns:
        bool: True if registration succeeds
    """
    self._raw_event_callback = callback
    self._setup_event_handler()
    return True

Register a callback for raw event data.

This provides access to the raw JSON event data from the SDK. Use this when you need custom event handling or access to all event types. This callback is called IN ADDITION to typed callbacks, not instead of.

Args

callback
Function called with raw JSON event data string

Returns

bool
True if registration succeeds
def on_media_connection_interrupted(self, callback: Callable[[int], None]) ‑> bool
Expand source code
def on_media_connection_interrupted(self, callback: Callable[[int], None]) -> bool:
    """
    Register a callback for media connection interrupted events.

    This automatically subscribes to EVENT_MEDIA_CONNECTION_INTERRUPTED.

    Args:
        callback: Function called with (timestamp,) when the media connection is interrupted

    Returns:
        bool: True if registration succeeds

    Example:
        >>> def on_interrupted(timestamp):
        ...     print(f"Media connection interrupted at {timestamp}")
        >>> client.on_media_connection_interrupted(on_interrupted)
    """
    self._media_interrupted_callback = callback
    self._setup_event_handler()
    try:
        self.subscribe_event([EVENT_MEDIA_CONNECTION_INTERRUPTED])
    except Exception as e:
        log_warn('client', f'Failed to auto-subscribe to media connection interrupted events: {e}')
    return True

Register a callback for media connection interrupted events.

This automatically subscribes to EVENT_MEDIA_CONNECTION_INTERRUPTED.

Args

callback
Function called with (timestamp,) when the media connection is interrupted

Returns

bool
True if registration succeeds

Example

>>> def on_interrupted(timestamp):
...     print(f"Media connection interrupted at {timestamp}")
>>> client.on_media_connection_interrupted(on_interrupted)
def on_participant_event(self, callback: Callable[[str, int, list], None]) ‑> bool
Expand source code
def on_participant_event(self, callback: Callable[[str, int, list], None]) -> bool:
    """
    Register a callback for participant join/leave events.

    This automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE.
    Events are delivered as parsed objects, not raw JSON.

    Args:
        callback: Function called with (event, timestamp, participants) where:
            - event: 'join' or 'leave'
            - timestamp: Unix timestamp in milliseconds
            - participants: List of dicts with 'user_id' and optional 'user_name'

    Returns:
        bool: True if registration succeeds

    Example:
        >>> def on_participant(event, timestamp, participants):
        ...     print(f"Participant {event}: {participants}")
        >>> client.on_participant_event(on_participant)
    """
    self._participant_event_callback = callback
    self._setup_event_handler()
    try:
        self.subscribe_event([EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE])
    except Exception as e:
        log_warn('client', f'Failed to auto-subscribe to participant events: {e}')
    return True

Register a callback for participant join/leave events.

This automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE. Events are delivered as parsed objects, not raw JSON.

Args

callback
Function called with (event, timestamp, participants) where: - event: 'join' or 'leave' - timestamp: Unix timestamp in milliseconds - participants: List of dicts with 'user_id' and optional 'user_name'

Returns

bool
True if registration succeeds

Example

>>> def on_participant(event, timestamp, participants):
...     print(f"Participant {event}: {participants}")
>>> client.on_participant_event(on_participant)
def on_participant_video(self, callback) ‑> None
Expand source code
def on_participant_video(self, callback) -> None:
    """Register a callback for participant video state changes.

    The callback receives (users: list[int], is_on: bool).
    """
    self._participant_video_callback = callback
    super().on_participant_video(callback)

Register a callback for participant video state changes.

The callback receives (users: list[int], is_on: bool).

def on_sharing_event(self, callback: Callable[[str, int, int | None, str | None], None]) ‑> bool
Expand source code
def on_sharing_event(self, callback: Callable[[str, int, Optional[int], Optional[str]], None]) -> bool:
    """
    Register a callback for sharing start/stop events.

    This automatically subscribes to EVENT_SHARING_START and EVENT_SHARING_STOP.
    Note: These events only work when the RTMS app has DESKSHARE scope permission.

    Args:
        callback: Function called with (event, timestamp, user_id, user_name) where:
            - event: 'start' or 'stop'
            - timestamp: Unix timestamp in milliseconds
            - user_id: Optional user ID (only for 'start')
            - user_name: Optional user name (only for 'start')

    Returns:
        bool: True if registration succeeds

    Example:
        >>> def on_sharing(event, timestamp, user_id, user_name):
        ...     print(f"Sharing {event} by {user_name}")
        >>> client.on_sharing_event(on_sharing)
    """
    self._sharing_callback = callback
    self._setup_event_handler()
    try:
        self.subscribe_event([EVENT_SHARING_START, EVENT_SHARING_STOP])
    except Exception as e:
        log_warn('client', f'Failed to auto-subscribe to sharing events: {e}')
    return True

Register a callback for sharing start/stop events.

This automatically subscribes to EVENT_SHARING_START and EVENT_SHARING_STOP. Note: These events only work when the RTMS app has DESKSHARE scope permission.

Args

callback
Function called with (event, timestamp, user_id, user_name) where: - event: 'start' or 'stop' - timestamp: Unix timestamp in milliseconds - user_id: Optional user ID (only for 'start') - user_name: Optional user name (only for 'start')

Returns

bool
True if registration succeeds

Example

>>> def on_sharing(event, timestamp, user_id, user_name):
...     print(f"Sharing {event} by {user_name}")
>>> client.on_sharing_event(on_sharing)
def on_transcript_data(self, callback) ‑> None
Expand source code
def on_transcript_data(self, callback) -> None:
    """Register transcript data callback. Supports executor and async coroutines."""
    super().on_transcript_data(self._wrap_callback(callback))

Register transcript data callback. Supports executor and async coroutines.

def on_video_data(self, callback) ‑> None
Expand source code
def on_video_data(self, callback) -> None:
    """Register video data callback. Supports executor and async coroutines."""
    super().on_video_data(self._wrap_callback(callback))

Register video data callback. Supports executor and async coroutines.

def on_video_subscribed(self, callback) ‑> None
Expand source code
def on_video_subscribed(self, callback) -> None:
    """Register a callback for video subscription responses.

    The callback receives (user_id: int, status: int, error: str).
    """
    self._video_subscribed_callback = callback
    super().on_video_subscribed(callback)

Register a callback for video subscription responses.

The callback receives (user_id: int, status: int, error: str).

def on_webhook_event(self, callback=None, port=None, path=None)
Expand source code
def on_webhook_event(self, callback=None, port=None, path=None):
    """
    Register a webhook event handler.

    This can be used as a decorator or a direct method call:

    @client.on_webhook_event(port=8080, path='/webhook')
    def handle_webhook(payload):
        print(f"Received webhook: {payload}")

    Args:
        callback (callable, optional): Function to call when a webhook is received
        port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
        path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'

    Returns:
        callable: Decorator function if used as a decorator
    """
    # If used as a decorator without arguments
    if callback is not None and callable(callback):
        # Start webhook server with provided callback
        self._start_webhook_server(callback)
        return callback

    # If used as a decorator with arguments or as a method call
    def decorator(func):
        self._start_webhook_server(func, port, path)
        return func

    return decorator

Register a webhook event handler.

This can be used as a decorator or a direct method call:

@client.on_webhook_event(port=8080, path='/webhook') def handle_webhook(payload): print(f"Received webhook: {payload}")

Args

callback : callable, optional
Function to call when a webhook is received
port : int, optional
Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
path : str, optional
URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'

Returns

callable
Decorator function if used as a decorator
def poll(self)
Expand source code
def poll(self):
    """Poll the C SDK for pending events. Called by the owning EventLoop's thread."""
    if self._running:
        try:
            super().poll()
        except Exception as e:
            log_error("client", f"Error during polling: {e}")

Poll the C SDK for pending events. Called by the owning EventLoop's thread.

def setAudioParams(self, params)
Expand source code
def set_audio_params(self, params):
    """
    Set audio parameters with validation.

    Args:
        params (AudioParams): Audio parameters object

    Returns:
        bool: True if parameters were set successfully

    Raises:
        ValueError: If parameters are invalid
    """
    _validate_audio_params(params)
    return super().set_audio_params(params)

Set audio parameters with validation.

Args

params : AudioParams
Audio parameters object

Returns

bool
True if parameters were set successfully

Raises

ValueError
If parameters are invalid
def setDeskshareParams(self, params)
Expand source code
def set_deskshare_params(self, params):
    """
    Set deskshare parameters with validation.

    Args:
        params (DeskshareParams): Deskshare parameters object

    Returns:
        bool: True if parameters were set successfully

    Raises:
        ValueError: If parameters are invalid
    """
    _validate_deskshare_params(params)
    return super().set_deskshare_params(params)

Set deskshare parameters with validation.

Args

params : DeskshareParams
Deskshare parameters object

Returns

bool
True if parameters were set successfully

Raises

ValueError
If parameters are invalid
def setProxy(self, proxy_type: str, proxy_url: str) ‑> None
Expand source code
def set_proxy(self, proxy_type: str, proxy_url: str) -> None:
    """Configure a proxy for SDK connections.

    Args:
        proxy_type (str): Proxy protocol type (e.g. 'http', 'https').
        proxy_url (str): Full proxy URL including host and port.
    """
    return super().set_proxy(proxy_type, proxy_url)

Configure a proxy for SDK connections.

Args

proxy_type : str
Proxy protocol type (e.g. 'http', 'https').
proxy_url : str
Full proxy URL including host and port.
def setTranscriptParams(self, params)
Expand source code
def set_transcript_params(self, params):
    """
    Set transcript parameters.

    Args:
        params (TranscriptParams): Transcript parameters object

    Returns:
        bool: True if parameters were set successfully
    """
    return super().set_transcript_params(params)

Set transcript parameters.

Args

params : TranscriptParams
Transcript parameters object

Returns

bool
True if parameters were set successfully
def setVideoParams(self, params)
Expand source code
def set_video_params(self, params):
    """
    Set video parameters with validation.

    Args:
        params (VideoParams): Video parameters object

    Returns:
        bool: True if parameters were set successfully

    Raises:
        ValueError: If parameters are invalid
    """
    _validate_video_params(params)
    return super().set_video_params(params)

Set video parameters with validation.

Args

params : VideoParams
Video parameters object

Returns

bool
True if parameters were set successfully

Raises

ValueError
If parameters are invalid
def set_audio_params(self, params)
Expand source code
def set_audio_params(self, params):
    """
    Set audio parameters with validation.

    Args:
        params (AudioParams): Audio parameters object

    Returns:
        bool: True if parameters were set successfully

    Raises:
        ValueError: If parameters are invalid
    """
    _validate_audio_params(params)
    return super().set_audio_params(params)

Set audio parameters with validation.

Args

params : AudioParams
Audio parameters object

Returns

bool
True if parameters were set successfully

Raises

ValueError
If parameters are invalid
def set_deskshare_params(self, params)
Expand source code
def set_deskshare_params(self, params):
    """
    Set deskshare parameters with validation.

    Args:
        params (DeskshareParams): Deskshare parameters object

    Returns:
        bool: True if parameters were set successfully

    Raises:
        ValueError: If parameters are invalid
    """
    _validate_deskshare_params(params)
    return super().set_deskshare_params(params)

Set deskshare parameters with validation.

Args

params : DeskshareParams
Deskshare parameters object

Returns

bool
True if parameters were set successfully

Raises

ValueError
If parameters are invalid
def set_proxy(self, proxy_type: str, proxy_url: str) ‑> None
Expand source code
def set_proxy(self, proxy_type: str, proxy_url: str) -> None:
    """Configure a proxy for SDK connections.

    Args:
        proxy_type (str): Proxy protocol type (e.g. 'http', 'https').
        proxy_url (str): Full proxy URL including host and port.
    """
    return super().set_proxy(proxy_type, proxy_url)

Configure a proxy for SDK connections.

Args

proxy_type : str
Proxy protocol type (e.g. 'http', 'https').
proxy_url : str
Full proxy URL including host and port.
def set_transcript_params(self, params)
Expand source code
def set_transcript_params(self, params):
    """
    Set transcript parameters.

    Args:
        params (TranscriptParams): Transcript parameters object

    Returns:
        bool: True if parameters were set successfully
    """
    return super().set_transcript_params(params)

Set transcript parameters.

Args

params : TranscriptParams
Transcript parameters object

Returns

bool
True if parameters were set successfully
def set_video_params(self, params)
Expand source code
def set_video_params(self, params):
    """
    Set video parameters with validation.

    Args:
        params (VideoParams): Video parameters object

    Returns:
        bool: True if parameters were set successfully

    Raises:
        ValueError: If parameters are invalid
    """
    _validate_video_params(params)
    return super().set_video_params(params)

Set video parameters with validation.

Args

params : VideoParams
Video parameters object

Returns

bool
True if parameters were set successfully

Raises

ValueError
If parameters are invalid
def stop(self)
Expand source code
def stop(self):
    """
    Stop RTMS client and release resources.

    This is equivalent to calling leave(), but with a more intuitive name.
    """
    return self.leave()

Stop RTMS client and release resources.

This is equivalent to calling leave(), but with a more intuitive name.

def subscribeEvent(self, events)
Expand source code
def subscribe_event(self, events):
    """
    Subscribe to receive specific event types.

    Note: Calling on_participant_event() automatically subscribes to
    EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE events.

    Args:
        events: List of event type constants (e.g., [EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE])

    Returns:
        bool: True if subscription was successful
    """
    return super().subscribe_event(events)

Subscribe to receive specific event types.

Note: Calling on_participant_event() automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE events.

Args

events
List of event type constants (e.g., [EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE])

Returns

bool
True if subscription was successful
def subscribeVideo(self, user_id: int, subscribe: bool) ‑> None
Expand source code
def subscribe_video(self, user_id: int, subscribe: bool) -> None:
    """Subscribe or unsubscribe from an individual participant's video stream.

    Args:
        user_id (int): The participant's user ID.
        subscribe (bool): True to subscribe, False to unsubscribe.
    """
    return super().subscribe_video(user_id, subscribe)

Subscribe or unsubscribe from an individual participant's video stream.

Args

user_id : int
The participant's user ID.
subscribe : bool
True to subscribe, False to unsubscribe.
def subscribe_event(self, events)
Expand source code
def subscribe_event(self, events):
    """
    Subscribe to receive specific event types.

    Note: Calling on_participant_event() automatically subscribes to
    EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE events.

    Args:
        events: List of event type constants (e.g., [EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE])

    Returns:
        bool: True if subscription was successful
    """
    return super().subscribe_event(events)

Subscribe to receive specific event types.

Note: Calling on_participant_event() automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE events.

Args

events
List of event type constants (e.g., [EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE])

Returns

bool
True if subscription was successful
def subscribe_video(self, user_id: int, subscribe: bool) ‑> None
Expand source code
def subscribe_video(self, user_id: int, subscribe: bool) -> None:
    """Subscribe or unsubscribe from an individual participant's video stream.

    Args:
        user_id (int): The participant's user ID.
        subscribe (bool): True to subscribe, False to unsubscribe.
    """
    return super().subscribe_video(user_id, subscribe)

Subscribe or unsubscribe from an individual participant's video stream.

Args

user_id : int
The participant's user ID.
subscribe : bool
True to subscribe, False to unsubscribe.
def unsubscribeEvent(self, events)
Expand source code
def unsubscribe_event(self, events):
    """
    Unsubscribe from specific event types.

    Args:
        events: List of event type constants

    Returns:
        bool: True if unsubscription was successful
    """
    return super().unsubscribe_event(events)

Unsubscribe from specific event types.

Args

events
List of event type constants

Returns

bool
True if unsubscription was successful
def unsubscribe_event(self, events)
Expand source code
def unsubscribe_event(self, events):
    """
    Unsubscribe from specific event types.

    Args:
        events: List of event type constants

    Returns:
        bool: True if unsubscription was successful
    """
    return super().unsubscribe_event(events)

Unsubscribe from specific event types.

Args

events
List of event type constants

Returns

bool
True if unsubscription was successful
class DataOption (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var AUDIO_MIXED_STREAM

The type of the None singleton.

var AUDIO_MULTI_STREAMS

The type of the None singleton.

var UNDEFINED

The type of the None singleton.

The type of the None singleton.

var VIDEO_MIXED_SPEAKER_VIEW

The type of the None singleton.

var VIDEO_SINGLE_ACTIVE_STREAM

The type of the None singleton.

var VIDEO_SINGLE_INDIVIDUAL_STREAM

The type of the None singleton.

class AudioDataOption (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var AUDIO_MIXED_STREAM

The type of the None singleton.

var AUDIO_MULTI_STREAMS

The type of the None singleton.

var UNDEFINED

The type of the None singleton.

The type of the None singleton.

var VIDEO_MIXED_SPEAKER_VIEW

The type of the None singleton.

var VIDEO_SINGLE_ACTIVE_STREAM

The type of the None singleton.

var VIDEO_SINGLE_INDIVIDUAL_STREAM

The type of the None singleton.

class VideoDataOption (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var AUDIO_MIXED_STREAM

The type of the None singleton.

var AUDIO_MULTI_STREAMS

The type of the None singleton.

var UNDEFINED

The type of the None singleton.

The type of the None singleton.

var VIDEO_MIXED_SPEAKER_VIEW

The type of the None singleton.

var VIDEO_SINGLE_ACTIVE_STREAM

The type of the None singleton.

var VIDEO_SINGLE_INDIVIDUAL_STREAM

The type of the None singleton.

class DeskshareParams (...)

init(args, *kwargs) Overloaded function.

  1. init(self: rtms._rtms.DeskshareParams) -> None

  2. init(self: rtms._rtms.DeskshareParams, content_type: typing.SupportsInt | typing.SupportsIndex, codec: typing.SupportsInt | typing.SupportsIndex, resolution: typing.SupportsInt | typing.SupportsIndex, fps: typing.SupportsInt | typing.SupportsIndex) -> None

Ancestors

  • pybind11_builtins.pybind11_object

Instance variables

prop codec
prop contentType
prop content_type
prop data_opt
prop fps
prop resolution
class EventLoop (poll_interval: float = 0.01, name: str = None)
Expand source code
class EventLoop:
    """
    An SDK I/O thread that owns one or more Client lifecycles.

    The Zoom C SDK requires that alloc(), join(), poll(), and release() all run
    on the same OS thread. EventLoop is that thread. Clients assigned to a loop
    via add() will have their entire lifecycle managed on the loop's thread.

    Usage::

        loop = rtms.EventLoop()

        @rtms.on_webhook_event
        def handle(payload):
            client = rtms.Client(executor=EXECUTOR)
            client.on_audio_data(on_audio)
            loop.add(client)
            client.join(payload['payload'])

        await loop.run_async()   # or loop.run() to block

    Callbacks are dispatched according to the executor set on each Client:
    - No executor: callback runs inline on the loop's thread (simple, low latency)
    - executor=ThreadPoolExecutor(...): heavy work offloaded to worker pool
    - async def callback: bridged to the asyncio event loop via run_coroutine_threadsafe
    """

    def __init__(self, poll_interval: float = 0.01, name: str = None):
        """
        Args:
            poll_interval: Seconds between poll cycles (default: 0.01 = 10ms)
            name: Optional thread name for debugging
        """
        self._poll_interval = poll_interval
        self._name = name
        self._clients: List['Client'] = []
        self._clients_lock = threading.Lock()
        self._pending: List['Client'] = []   # clients waiting for alloc+join on this thread
        self._pending_lock = threading.Lock()
        self._running = False
        self._stop_event = threading.Event()
        self._thread: Optional[threading.Thread] = None

    @property
    def client_count(self) -> int:
        """Number of clients currently owned by this loop."""
        with self._clients_lock:
            return len(self._clients)

    def add(self, client: 'Client') -> None:
        """
        Assign a client to this loop's thread.

        Must be called before client.join(). The loop's thread will call
        alloc() and join() on behalf of the client.

        Args:
            client: A Client whose join() will be deferred to this loop's thread
        """
        client._assigned_loop = self
        with self._pending_lock:
            self._pending.append(client)

    def _drain_pending(self) -> None:
        """Called from the loop's thread — alloc and join all waiting clients."""
        with self._pending_lock:
            pending = self._pending[:]
            self._pending.clear()

        for client in pending:
            try:
                client._do_alloc_and_join()
                with self._clients_lock:
                    self._clients.append(client)
            except Exception as e:
                log_error('eventloop', f'Failed to alloc/join client: {e}')
                traceback.print_exc()

    def _poll_all(self) -> None:
        """Poll all active clients. Removes clients that have left."""
        with self._clients_lock:
            active = self._clients[:]

        to_remove = []
        for client in active:
            if client._running:
                try:
                    client.poll()
                except Exception as e:
                    log_error('eventloop', f'Error polling client: {e}')
                    to_remove.append(client)
            else:
                to_remove.append(client)

        if to_remove:
            with self._clients_lock:
                for c in to_remove:
                    self._clients.discard(c) if hasattr(self._clients, 'discard') else None
            with self._clients_lock:
                self._clients = [c for c in self._clients if c not in to_remove]

    def run(self, stop_on_empty: bool = False) -> None:
        """
        Run the event loop on the current thread (blocking).

        The current thread becomes the SDK I/O thread. Use this when you want
        explicit control of which thread drives the loop.

        Args:
            stop_on_empty: Stop automatically when all clients have left
        """
        self._running = True
        self._stop_event.clear()
        log_info('eventloop', f'Starting event loop{" (" + self._name + ")" if self._name else ""} '
                              f'(poll_interval={self._poll_interval}s)')
        try:
            while self._running and not self._stop_event.is_set():
                self._drain_pending()
                self._poll_all()
                if stop_on_empty:
                    with self._clients_lock:
                        if not self._clients and not self._pending:
                            break
                time.sleep(self._poll_interval)
        except KeyboardInterrupt:
            pass
        finally:
            self._running = False
            log_debug('eventloop', 'Event loop stopped')

    async def run_async(self, stop_on_empty: bool = False) -> None:
        """
        Run the event loop as an asyncio coroutine.

        Yields to the asyncio event loop between poll cycles so other coroutines
        (aiohttp, FastAPI, asyncpg, etc.) run freely. Async callbacks registered
        on clients are automatically bridged to this event loop.

        Args:
            stop_on_empty: Stop automatically when all clients have left

        Example::

            async def main():
                loop = rtms.EventLoop()
                await asyncio.gather(loop.run_async(), aiohttp_app.start())

            asyncio.run(main())
        """
        self._running = True
        self._stop_event.clear()
        log_info('eventloop', f'Starting async event loop{" (" + self._name + ")" if self._name else ""} '
                              f'(poll_interval={self._poll_interval}s)')
        try:
            while self._running and not self._stop_event.is_set():
                self._drain_pending()
                self._poll_all()
                if stop_on_empty:
                    with self._clients_lock:
                        if not self._clients and not self._pending:
                            break
                await asyncio.sleep(self._poll_interval)
        except asyncio.CancelledError:
            log_info('eventloop', 'Async event loop cancelled')
        finally:
            self._running = False
            log_debug('eventloop', 'Async event loop stopped')

    def start(self) -> 'EventLoop':
        """
        Start the event loop in a background daemon thread.

        Returns self for chaining::

            loop = rtms.EventLoop().start()

        The thread runs until stop() is called or the process exits.
        """
        self._thread = threading.Thread(
            target=self.run,
            name=self._name or 'rtms-eventloop',
            daemon=True,
        )
        self._thread.start()
        log_debug('eventloop', f'Background thread started: {self._thread.name}')
        return self

    def stop(self) -> None:
        """Signal the event loop to stop after the current poll cycle."""
        self._running = False
        self._stop_event.set()

    def join(self, timeout: float = None) -> None:
        """Wait for the background thread to finish (only valid after start())."""
        if self._thread:
            self._thread.join(timeout=timeout)

An SDK I/O thread that owns one or more Client lifecycles.

The Zoom C SDK requires that alloc(), join(), poll(), and release() all run on the same OS thread. EventLoop is that thread. Clients assigned to a loop via add() will have their entire lifecycle managed on the loop's thread.

Usage::

loop = rtms.EventLoop()

@rtms.on_webhook_event
def handle(payload):
    client = rtms.Client(executor=EXECUTOR)
    client.on_audio_data(on_audio)
    loop.add(client)
    client.join(payload['payload'])

await loop.run_async()   # or loop.run() to block

Callbacks are dispatched according to the executor set on each Client: - No executor: callback runs inline on the loop's thread (simple, low latency) - executor=ThreadPoolExecutor(…): heavy work offloaded to worker pool - async def callback: bridged to the asyncio event loop via run_coroutine_threadsafe

Args

poll_interval
Seconds between poll cycles (default: 0.01 = 10ms)
name
Optional thread name for debugging

Instance variables

prop client_count : int
Expand source code
@property
def client_count(self) -> int:
    """Number of clients currently owned by this loop."""
    with self._clients_lock:
        return len(self._clients)

Number of clients currently owned by this loop.

Methods

def add(self, client: Client) ‑> None
Expand source code
def add(self, client: 'Client') -> None:
    """
    Assign a client to this loop's thread.

    Must be called before client.join(). The loop's thread will call
    alloc() and join() on behalf of the client.

    Args:
        client: A Client whose join() will be deferred to this loop's thread
    """
    client._assigned_loop = self
    with self._pending_lock:
        self._pending.append(client)

Assign a client to this loop's thread.

Must be called before client.join(). The loop's thread will call alloc() and join() on behalf of the client.

Args

client
A Client whose join() will be deferred to this loop's thread
def join(self, timeout: float = None) ‑> None
Expand source code
def join(self, timeout: float = None) -> None:
    """Wait for the background thread to finish (only valid after start())."""
    if self._thread:
        self._thread.join(timeout=timeout)

Wait for the background thread to finish (only valid after start()).

def run(self, stop_on_empty: bool = False) ‑> None
Expand source code
def run(self, stop_on_empty: bool = False) -> None:
    """
    Run the event loop on the current thread (blocking).

    The current thread becomes the SDK I/O thread. Use this when you want
    explicit control of which thread drives the loop.

    Args:
        stop_on_empty: Stop automatically when all clients have left
    """
    self._running = True
    self._stop_event.clear()
    log_info('eventloop', f'Starting event loop{" (" + self._name + ")" if self._name else ""} '
                          f'(poll_interval={self._poll_interval}s)')
    try:
        while self._running and not self._stop_event.is_set():
            self._drain_pending()
            self._poll_all()
            if stop_on_empty:
                with self._clients_lock:
                    if not self._clients and not self._pending:
                        break
            time.sleep(self._poll_interval)
    except KeyboardInterrupt:
        pass
    finally:
        self._running = False
        log_debug('eventloop', 'Event loop stopped')

Run the event loop on the current thread (blocking).

The current thread becomes the SDK I/O thread. Use this when you want explicit control of which thread drives the loop.

Args

stop_on_empty
Stop automatically when all clients have left
async def run_async(self, stop_on_empty: bool = False) ‑> None
Expand source code
async def run_async(self, stop_on_empty: bool = False) -> None:
    """
    Run the event loop as an asyncio coroutine.

    Yields to the asyncio event loop between poll cycles so other coroutines
    (aiohttp, FastAPI, asyncpg, etc.) run freely. Async callbacks registered
    on clients are automatically bridged to this event loop.

    Args:
        stop_on_empty: Stop automatically when all clients have left

    Example::

        async def main():
            loop = rtms.EventLoop()
            await asyncio.gather(loop.run_async(), aiohttp_app.start())

        asyncio.run(main())
    """
    self._running = True
    self._stop_event.clear()
    log_info('eventloop', f'Starting async event loop{" (" + self._name + ")" if self._name else ""} '
                          f'(poll_interval={self._poll_interval}s)')
    try:
        while self._running and not self._stop_event.is_set():
            self._drain_pending()
            self._poll_all()
            if stop_on_empty:
                with self._clients_lock:
                    if not self._clients and not self._pending:
                        break
            await asyncio.sleep(self._poll_interval)
    except asyncio.CancelledError:
        log_info('eventloop', 'Async event loop cancelled')
    finally:
        self._running = False
        log_debug('eventloop', 'Async event loop stopped')

Run the event loop as an asyncio coroutine.

Yields to the asyncio event loop between poll cycles so other coroutines (aiohttp, FastAPI, asyncpg, etc.) run freely. Async callbacks registered on clients are automatically bridged to this event loop.

Args

stop_on_empty
Stop automatically when all clients have left

Example::

async def main():
    loop = rtms.EventLoop()
    await asyncio.gather(loop.run_async(), aiohttp_app.start())

asyncio.run(main())
def start(self) ‑> EventLoop
Expand source code
def start(self) -> 'EventLoop':
    """
    Start the event loop in a background daemon thread.

    Returns self for chaining::

        loop = rtms.EventLoop().start()

    The thread runs until stop() is called or the process exits.
    """
    self._thread = threading.Thread(
        target=self.run,
        name=self._name or 'rtms-eventloop',
        daemon=True,
    )
    self._thread.start()
    log_debug('eventloop', f'Background thread started: {self._thread.name}')
    return self

Start the event loop in a background daemon thread.

Returns self for chaining::

loop = rtms.EventLoop().start()

The thread runs until stop() is called or the process exits.

def stop(self) ‑> None
Expand source code
def stop(self) -> None:
    """Signal the event loop to stop after the current poll cycle."""
    self._running = False
    self._stop_event.set()

Signal the event loop to stop after the current poll cycle.

class EventLoopPool (threads: int = 4, poll_interval: float = 0.01, strategy: str = 'least_loaded')
Expand source code
class EventLoopPool:
    """
    A pool of EventLoop threads that distributes clients across N SDK I/O threads.

    Use this for high-concurrency deployments where many clients share a fixed
    number of threads. Each client is permanently assigned to one loop for its
    entire lifetime.

    Usage::

        pool = rtms.EventLoopPool(threads=4)

        @rtms.on_webhook_event
        def handle(payload):
            client = rtms.Client(executor=EXECUTOR)
            client.on_audio_data(on_audio)
            pool.add(client)          # routed to least-loaded loop
            client.join(payload['payload'])

        await pool.run_async()        # or pool.run()

    Scaling guidance:
    - 1 thread per ~25 clients is a reasonable starting point
    - Use executor= on Client for CPU/IO-heavy callbacks
    - Monitor loop.client_count to tune thread count
    """

    def __init__(
        self,
        threads: int = 4,
        poll_interval: float = 0.01,
        strategy: str = 'least_loaded',
    ):
        """
        Args:
            threads: Number of SDK I/O threads (default: 4)
            poll_interval: Seconds between poll cycles per loop (default: 0.01)
            strategy: Client routing strategy — 'least_loaded' or 'round_robin'
        """
        if threads < 1:
            raise ValueError("threads must be >= 1")
        if strategy not in ('least_loaded', 'round_robin'):
            raise ValueError("strategy must be 'least_loaded' or 'round_robin'")
        self._loops = [
            EventLoop(poll_interval=poll_interval, name=f'rtms-pool-{i}')
            for i in range(threads)
        ]
        self._strategy = strategy
        self._rr_index = 0
        self._rr_lock = threading.Lock()

    @property
    def loops(self) -> List[EventLoop]:
        """The underlying EventLoop list."""
        return self._loops

    @property
    def client_count(self) -> int:
        """Total clients across all loops."""
        return sum(l.client_count for l in self._loops)

    def add(self, client: 'Client') -> EventLoop:
        """
        Assign a client to a loop according to the routing strategy.

        Returns the EventLoop the client was assigned to.
        """
        if self._strategy == 'least_loaded':
            loop = min(self._loops, key=lambda l: l.client_count)
        else:  # round_robin
            with self._rr_lock:
                loop = self._loops[self._rr_index % len(self._loops)]
                self._rr_index += 1
        loop.add(client)
        return loop

    def run(self, stop_on_empty: bool = False) -> None:
        """
        Run all loops. Starts N-1 loops as background daemon threads and runs
        the last one on the current thread (blocking).
        """
        for loop in self._loops[:-1]:
            loop.start()
        self._loops[-1].run(stop_on_empty=stop_on_empty)

    async def run_async(self, stop_on_empty: bool = False) -> None:
        """
        Run all loops as asyncio coroutines concurrently.
        """
        await asyncio.gather(*[l.run_async(stop_on_empty=stop_on_empty) for l in self._loops])

    def stop(self) -> None:
        """Stop all loops."""
        for loop in self._loops:
            loop.stop()

A pool of EventLoop threads that distributes clients across N SDK I/O threads.

Use this for high-concurrency deployments where many clients share a fixed number of threads. Each client is permanently assigned to one loop for its entire lifetime.

Usage::

pool = rtms.EventLoopPool(threads=4)

@rtms.on_webhook_event
def handle(payload):
    client = rtms.Client(executor=EXECUTOR)
    client.on_audio_data(on_audio)
    pool.add(client)          # routed to least-loaded loop
    client.join(payload['payload'])

await pool.run_async()        # or pool.run()

Scaling guidance: - 1 thread per ~25 clients is a reasonable starting point - Use executor= on Client for CPU/IO-heavy callbacks - Monitor loop.client_count to tune thread count

Args

threads
Number of SDK I/O threads (default: 4)
poll_interval
Seconds between poll cycles per loop (default: 0.01)
strategy
Client routing strategy — 'least_loaded' or 'round_robin'

Instance variables

prop client_count : int
Expand source code
@property
def client_count(self) -> int:
    """Total clients across all loops."""
    return sum(l.client_count for l in self._loops)

Total clients across all loops.

prop loops : List[EventLoop]
Expand source code
@property
def loops(self) -> List[EventLoop]:
    """The underlying EventLoop list."""
    return self._loops

The underlying EventLoop list.

Methods

def add(self, client: Client) ‑> EventLoop
Expand source code
def add(self, client: 'Client') -> EventLoop:
    """
    Assign a client to a loop according to the routing strategy.

    Returns the EventLoop the client was assigned to.
    """
    if self._strategy == 'least_loaded':
        loop = min(self._loops, key=lambda l: l.client_count)
    else:  # round_robin
        with self._rr_lock:
            loop = self._loops[self._rr_index % len(self._loops)]
            self._rr_index += 1
    loop.add(client)
    return loop

Assign a client to a loop according to the routing strategy.

Returns the EventLoop the client was assigned to.

def run(self, stop_on_empty: bool = False) ‑> None
Expand source code
def run(self, stop_on_empty: bool = False) -> None:
    """
    Run all loops. Starts N-1 loops as background daemon threads and runs
    the last one on the current thread (blocking).
    """
    for loop in self._loops[:-1]:
        loop.start()
    self._loops[-1].run(stop_on_empty=stop_on_empty)

Run all loops. Starts N-1 loops as background daemon threads and runs the last one on the current thread (blocking).

async def run_async(self, stop_on_empty: bool = False) ‑> None
Expand source code
async def run_async(self, stop_on_empty: bool = False) -> None:
    """
    Run all loops as asyncio coroutines concurrently.
    """
    await asyncio.gather(*[l.run_async(stop_on_empty=stop_on_empty) for l in self._loops])

Run all loops as asyncio coroutines concurrently.

def stop(self) ‑> None
Expand source code
def stop(self) -> None:
    """Stop all loops."""
    for loop in self._loops:
        loop.stop()

Stop all loops.

class EventType (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var ACTIVE_SPEAKER_CHANGE

The type of the None singleton.

var CONSUMER_ANSWERED

The type of the None singleton.

var CONSUMER_END

The type of the None singleton.

var FIRST_PACKET_TIMESTAMP

The type of the None singleton.

var MEDIA_CONNECTION_INTERRUPTED

The type of the None singleton.

var PARTICIPANT_JOIN

The type of the None singleton.

var PARTICIPANT_LEAVE

The type of the None singleton.

var PARTICIPANT_VIDEO_OFF

The type of the None singleton.

var PARTICIPANT_VIDEO_ON

The type of the None singleton.

var SHARING_START

The type of the None singleton.

var SHARING_STOP

The type of the None singleton.

var UNDEFINED

The type of the None singleton.

var USER_ANSWERED

The type of the None singleton.

var USER_END

The type of the None singleton.

var USER_HOLD

The type of the None singleton.

var USER_UNHOLD

The type of the None singleton.

class LogFormat (*args, **kwds)
Expand source code
class LogFormat(str, Enum):
    """Available log output formats"""
    PROGRESSIVE = 'progressive'
    JSON = 'json'

Available log output formats

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var JSON

The type of the None singleton.

var PROGRESSIVE

The type of the None singleton.

class LogLevel (*args, **kwds)
Expand source code
class LogLevel(IntEnum):
    """Available log levels for RTMS SDK logging"""
    ERROR = 0
    WARN = 1
    INFO = 2
    DEBUG = 3
    TRACE = 4

Available log levels for RTMS SDK logging

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var DEBUG

The type of the None singleton.

var ERROR

The type of the None singleton.

var INFO

The type of the None singleton.

var TRACE

The type of the None singleton.

var WARN

The type of the None singleton.

class MediaDataType (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var ALL

The type of the None singleton.

var AUDIO

The type of the None singleton.

var CHAT

The type of the None singleton.

var DESKSHARE

The type of the None singleton.

var TRANSCRIPT

The type of the None singleton.

var UNDEFINED

The type of the None singleton.

var VIDEO

The type of the None singleton.

class MessageType (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var CLIENT_READY_ACK

The type of the None singleton.

var DATA_HAND_SHAKE_REQ

The type of the None singleton.

var DATA_HAND_SHAKE_RESP

The type of the None singleton.

var EVENT_SUBSCRIPTION

The type of the None singleton.

var EVENT_UPDATE

The type of the None singleton.

var KEEP_ALIVE_REQ

The type of the None singleton.

var KEEP_ALIVE_RESP

The type of the None singleton.

var MEDIA_DATA_AUDIO

The type of the None singleton.

var MEDIA_DATA_CHAT

The type of the None singleton.

var MEDIA_DATA_SHARE

The type of the None singleton.

var MEDIA_DATA_TRANSCRIPT

The type of the None singleton.

var MEDIA_DATA_VIDEO

The type of the None singleton.

var META_DATA_AUDIO

The type of the None singleton.

var META_DATA_CHAT

The type of the None singleton.

var META_DATA_SHARE

The type of the None singleton.

var META_DATA_TRANSCRIPT

The type of the None singleton.

var META_DATA_VIDEO

The type of the None singleton.

var SESSION_STATE_REQ

The type of the None singleton.

var SESSION_STATE_RESP

The type of the None singleton.

var SESSION_STATE_UPDATE

The type of the None singleton.

var SIGNALING_HAND_SHAKE_REQ

The type of the None singleton.

var SIGNALING_HAND_SHAKE_RESP

The type of the None singleton.

var STREAM_CLOSE_REQ

The type of the None singleton.

var STREAM_CLOSE_RESP

The type of the None singleton.

var STREAM_STATE_REQ

The type of the None singleton.

var STREAM_STATE_RESP

The type of the None singleton.

var STREAM_STATE_UPDATE

The type of the None singleton.

var UNDEFINED

The type of the None singleton.

var VIDEO_SUBSCRIPTION_REQ

The type of the None singleton.

var VIDEO_SUBSCRIPTION_RESP

The type of the None singleton.

class Metadata (*args, **kwargs)

Ancestors

  • pybind11_builtins.pybind11_object

Instance variables

prop aiInterpreter
prop endTs
prop startTs
prop userId
prop userName
class Participant (*args, **kwargs)

Ancestors

  • pybind11_builtins.pybind11_object

Instance variables

prop id
prop name
class Session (*args, **kwargs)

Ancestors

  • pybind11_builtins.pybind11_object

Instance variables

prop isActive
prop isPaused
prop meetingId
prop sessionId
prop statTime
prop status
prop streamId
class SessionState (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var INACTIVE

The type of the None singleton.

var INITIALIZE

The type of the None singleton.

var PAUSED

The type of the None singleton.

var RESUMED

The type of the None singleton.

var STARTED

The type of the None singleton.

var STOPPED

The type of the None singleton.

class StopReason (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var ADMIN_DISABLED_APP

The type of the None singleton.

var AGENT_DISCONNECTED

The type of the None singleton.

var ALL_APPS_DISABLED

The type of the None singleton.

var AUTHENTICATION_FAILURE

The type of the None singleton.

var AWAIT_RECONNECTION_TIMEOUT

The type of the None singleton.

var CONNECTION_TIMEOUT

The type of the None singleton.

var CUSTOMER_DISCONNECTED

The type of the None singleton.

var DATA_CONNECTION_CLOSED_ABNORMALLY

The type of the None singleton.

var DATA_CONNECTION_INTERRUPTED

The type of the None singleton.

var EXIT_SIGNAL

The type of the None singleton.

var HOST_DISABLED_APP

The type of the None singleton.

var HOST_TRIGGERED

The type of the None singleton.

var INSTANCE_CONNECTION_INTERRUPTED

The type of the None singleton.

var INTERNAL_EXCEPTION

The type of the None singleton.

var KEEP_ALIVE_TIMEOUT

The type of the None singleton.

var MANUAL_API_TRIGGERED

The type of the None singleton.

var MEETING_ENDED

The type of the None singleton.

var RECEIVER_REQUEST_CLOSE

The type of the None singleton.

var SIGNAL_CONNECTION_CLOSED_ABNORMALLY

The type of the None singleton.

var SIGNAL_CONNECTION_INTERRUPTED

The type of the None singleton.

var STREAMING_NOT_SUPPORTED

The type of the None singleton.

var STREAM_CANCELED

The type of the None singleton.

var STREAM_REVOKED

The type of the None singleton.

var UNDEFINED

The type of the None singleton.

var USER_EJECTED

The type of the None singleton.

var USER_LEFT

The type of the None singleton.

var USER_TRIGGERED

The type of the None singleton.

class StreamState (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var ACTIVE

The type of the None singleton.

var INACTIVE

The type of the None singleton.

var INTERRUPTED

The type of the None singleton.

var PAUSED

The type of the None singleton.

var RESUMED

The type of the None singleton.

var TERMINATED

The type of the None singleton.

var TERMINATING

The type of the None singleton.

class TranscriptLanguage (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var ARABIC

The type of the None singleton.

var BENGALI

The type of the None singleton.

var CANTONESE

The type of the None singleton.

var CATALAN

The type of the None singleton.

var CHINESE_SIMPLIFIED

The type of the None singleton.

var CHINESE_TRADITIONAL

The type of the None singleton.

var CZECH

The type of the None singleton.

var DANISH

The type of the None singleton.

var DUTCH

The type of the None singleton.

var ENGLISH

The type of the None singleton.

var ESTONIAN

The type of the None singleton.

var FINNISH

The type of the None singleton.

var FRENCH_CANADA

The type of the None singleton.

var FRENCH_FRANCE

The type of the None singleton.

var GERMAN

The type of the None singleton.

var HEBREW

The type of the None singleton.

var HINDI

The type of the None singleton.

var HUNGARIAN

The type of the None singleton.

var INDONESIAN

The type of the None singleton.

var ITALIAN

The type of the None singleton.

var JAPANESE

The type of the None singleton.

var KOREAN

The type of the None singleton.

var MALAY

The type of the None singleton.

var NONE

The type of the None singleton.

var PERSIAN

The type of the None singleton.

var POLISH

The type of the None singleton.

var PORTUGUESE

The type of the None singleton.

var ROMANIAN

The type of the None singleton.

var RUSSIAN

The type of the None singleton.

var SPANISH

The type of the None singleton.

var SWEDISH

The type of the None singleton.

var TAGALOG

The type of the None singleton.

var TAMIL

The type of the None singleton.

var TELUGU

The type of the None singleton.

var THAI

The type of the None singleton.

var TURKISH

The type of the None singleton.

var UKRAINIAN

The type of the None singleton.

var VIETNAMESE

The type of the None singleton.

class TranscriptParams (...)

init(self: rtms._rtms.TranscriptParams) -> None

Ancestors

  • pybind11_builtins.pybind11_object

Instance variables

prop contentType
prop content_type
prop enableLid
prop enable_lid
prop srcLanguage
prop src_language
class VideoCodec (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var H264

The type of the None singleton.

var JPG

The type of the None singleton.

var PNG

The type of the None singleton.

var UNDEFINED

The type of the None singleton.

class VideoContentType (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var FILE_STREAM

The type of the None singleton.

var RAW_VIDEO

The type of the None singleton.

var RTP

The type of the None singleton.

var TEXT

The type of the None singleton.

var UNDEFINED

The type of the None singleton.

class VideoParams (...)

init(args, *kwargs) Overloaded function.

  1. init(self: rtms._rtms.VideoParams) -> None

  2. init(self: rtms._rtms.VideoParams, content_type: typing.SupportsInt | typing.SupportsIndex, codec: typing.SupportsInt | typing.SupportsIndex, resolution: typing.SupportsInt | typing.SupportsIndex, data_opt: typing.SupportsInt | typing.SupportsIndex, fps: typing.SupportsInt | typing.SupportsIndex) -> None

Ancestors

  • pybind11_builtins.pybind11_object

Instance variables

prop codec
prop contentType
prop content_type
prop dataOpt
prop data_opt
prop fps
prop resolution
class VideoResolution (*args, **kwds)

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var FHD

The type of the None singleton.

var HD

The type of the None singleton.

var QHD

The type of the None singleton.

var SD

The type of the None singleton.