Package rtms

Functions

def configure_logger(options: dict)
Expand source code
def configure_logger(options: dict):
    """Configure the logger with the specified options"""
    global _log_level, _log_format, _log_enabled

    if 'level' in options:
        _log_level = options['level']

    if 'format' in options:
        _log_format = options['format']

    if 'enabled' in options:
        _log_enabled = options['enabled']

    log_info('logger', f"Logger configured: level={_log_level}, format={_log_format}, enabled={_log_enabled}")

    log_debug("rtms", "Importing _rtms module")

Configure the logger with the specified options

def generate_signature(client, secret, uuid, rtms_stream_id)
Expand source code
def generate_signature(client, secret, uuid, rtms_stream_id):
    """Generate a signature for RTMS authentication"""
    client_id = os.getenv("ZM_RTMS_CLIENT", client)
    client_secret = os.getenv("ZM_RTMS_SECRET", secret)

    if not client_id:
        raise EnvironmentError("Client ID cannot be blank")
    elif not client_secret:
        raise EnvironmentError("Client Secret cannot be blank")

    message = f"{client_id},{uuid},{rtms_stream_id}"

    signature = hmac.new(
        client_secret.encode('utf-8'),
        message.encode('utf-8'),
        hashlib.sha256
    ).hexdigest()

    return signature

Generate a signature for RTMS authentication

def initialize(ca_path=None)
Expand source code
def initialize(ca_path=None):
    """
    Initialize the RTMS SDK.

    Note: The SDK is automatically initialized when you create a Client instance.
    You only need to call this if you want to initialize with a custom CA path.

    Args:
        ca_path (str, optional): Path to the CA certificate file.

    Returns:
        bool: True if initialization was successful
    """
    ca_path = find_ca_certificate(ca_path)
    try:
        _ClientBase.initialize(ca_path)
        Client._sdk_initialized = True
        return True
    except Exception as e:
        log_error("rtms", f"Error initializing RTMS SDK: {e}")
        return False

Initialize the RTMS SDK.

Note: The SDK is automatically initialized when you create a Client instance. You only need to call this if you want to initialize with a custom CA path.

Args

ca_path : str, optional
Path to the CA certificate file.

Returns

bool
True if initialization was successful
def log_debug(component, message, details=None)
Expand source code
def log_debug(component, message, details=None):
    """Log a debug message"""
    _log(LogLevel.DEBUG, component, message, details)

Log a debug message

def log_error(component, message, details=None)
Expand source code
def log_error(component, message, details=None):
    """Log an error message"""
    _log(LogLevel.ERROR, component, message, details)

Log an error message

def log_info(component, message, details=None)
Expand source code
def log_info(component, message, details=None):
    """Log an info message"""
    _log(LogLevel.INFO, component, message, details)

Log an info message

def log_warn(component, message, details=None)
Expand source code
def log_warn(component, message, details=None):
    """Log a warning message"""
    _log(LogLevel.WARN, component, message, details)

Log a warning message

def onWebhookEvent(callback=None, port=None, path=None)
Expand source code
def onWebhookEvent(callback=None, port=None, path=None):
    """
    Start a webhook server to receive events from Zoom.

    This function creates an HTTP server that listens for webhook events from Zoom.
    When a webhook event is received, it parses the JSON payload and passes it to
    the provided callback function.

    Can be used as a decorator or a direct function call:

    @rtms.onWebhookEvent(port=8080, path='/webhook')
    def handle_webhook(payload):
        if payload.get('event') == 'meeting.rtms.started':
            # Create a client and join
            client = rtms.Client()
            client.join(...)

    Args:
        callback (callable, optional): Function to call when a webhook is received
        port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
        path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'

    Returns:
        callable: Decorator function if used as a decorator
    """
    global _webhook_server

    # Determine port and path
    webhook_port = port or int(os.getenv('ZM_RTMS_PORT', '8080'))
    webhook_path = path or os.getenv('ZM_RTMS_PATH', '/')

    # If used as a decorator without arguments
    if callback is not None and callable(callback):
        if _webhook_server is None:
            _webhook_server = WebhookServer(webhook_port, webhook_path)
        _webhook_server.start(callback)
        return callback

    # If used as a decorator with arguments or as a method call
    def decorator(func):
        global _webhook_server
        if _webhook_server is None:
            _webhook_server = WebhookServer(webhook_port, webhook_path)
        _webhook_server.start(func)
        return func

    return decorator

Start a webhook server to receive events from Zoom.

This function creates an HTTP server that listens for webhook events from Zoom. When a webhook event is received, it parses the JSON payload and passes it to the provided callback function.

Can be used as a decorator or a direct function call:

@rtms.onWebhookEvent(port=8080, path='/webhook') def handle_webhook(payload): if payload.get('event') == 'meeting.rtms.started': # Create a client and join client = rtms.Client() client.join(…)

Args

callback : callable, optional
Function to call when a webhook is received
port : int, optional
Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
path : str, optional
URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'

Returns

callable
Decorator function if used as a decorator
def on_webhook_event(callback=None, port=None, path=None)
Expand source code
def onWebhookEvent(callback=None, port=None, path=None):
    """
    Start a webhook server to receive events from Zoom.

    This function creates an HTTP server that listens for webhook events from Zoom.
    When a webhook event is received, it parses the JSON payload and passes it to
    the provided callback function.

    Can be used as a decorator or a direct function call:

    @rtms.onWebhookEvent(port=8080, path='/webhook')
    def handle_webhook(payload):
        if payload.get('event') == 'meeting.rtms.started':
            # Create a client and join
            client = rtms.Client()
            client.join(...)

    Args:
        callback (callable, optional): Function to call when a webhook is received
        port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
        path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'

    Returns:
        callable: Decorator function if used as a decorator
    """
    global _webhook_server

    # Determine port and path
    webhook_port = port or int(os.getenv('ZM_RTMS_PORT', '8080'))
    webhook_path = path or os.getenv('ZM_RTMS_PATH', '/')

    # If used as a decorator without arguments
    if callback is not None and callable(callback):
        if _webhook_server is None:
            _webhook_server = WebhookServer(webhook_port, webhook_path)
        _webhook_server.start(callback)
        return callback

    # If used as a decorator with arguments or as a method call
    def decorator(func):
        global _webhook_server
        if _webhook_server is None:
            _webhook_server = WebhookServer(webhook_port, webhook_path)
        _webhook_server.start(func)
        return func

    return decorator

Start a webhook server to receive events from Zoom.

This function creates an HTTP server that listens for webhook events from Zoom. When a webhook event is received, it parses the JSON payload and passes it to the provided callback function.

Can be used as a decorator or a direct function call:

@rtms.onWebhookEvent(port=8080, path='/webhook') def handle_webhook(payload): if payload.get('event') == 'meeting.rtms.started': # Create a client and join client = rtms.Client() client.join(…)

Args

callback : callable, optional
Function to call when a webhook is received
port : int, optional
Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
path : str, optional
URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'

Returns

callable
Decorator function if used as a decorator
def run(poll_interval: float = 0.01, stop_on_empty: bool = False)
Expand source code
def run(poll_interval: float = 0.01, stop_on_empty: bool = False):
    """
    Start the RTMS event loop.

    This function blocks and handles:
    - Polling all active clients
    - Processing pending operations from other threads (like webhook handlers)
    - Graceful shutdown on KeyboardInterrupt

    Args:
        poll_interval: Time in seconds between poll cycles (default: 0.01 = 10ms)
        stop_on_empty: If True, stop when no clients remain (default: False)

    Example:
        >>> import rtms
        >>>
        >>> clients = {}
        >>>
        >>> @rtms.onWebhookEvent
        >>> def handle(payload):
        >>>     client = rtms.Client()
        >>>     clients[payload['payload']['rtms_stream_id']] = client
        >>>     client.onTranscriptData(lambda d,s,t,m: print(m.userName, d))
        >>>     client.join(payload['payload'])
        >>>
        >>> rtms.run()  # Blocks until interrupted
    """
    global _main_thread_id, _running

    _main_thread_id = threading.get_ident()
    _running = True
    _stop_event.clear()

    log_info('rtms', f'Starting RTMS event loop (poll_interval={poll_interval}s)')

    try:
        while _running and not _stop_event.is_set():
            # Process pending operations from other threads
            _process_pending_operations()

            # Poll all active clients
            with _clients_lock:
                clients_to_poll = list(_clients.values())

            for client in clients_to_poll:
                if client._running:
                    try:
                        client.poll()
                    except Exception as e:
                        log_error('rtms', f'Error polling client: {e}')

            # Check stop_on_empty condition
            if stop_on_empty and not clients_to_poll:
                log_info('rtms', 'No active clients, stopping event loop')
                break

            time.sleep(poll_interval)

    except KeyboardInterrupt:
        log_info('rtms', 'Received interrupt, shutting down...')
    finally:
        _running = False
        _main_thread_id = None
        _cleanup_all_clients()

Start the RTMS event loop.

This function blocks and handles: - Polling all active clients - Processing pending operations from other threads (like webhook handlers) - Graceful shutdown on KeyboardInterrupt

Args

poll_interval
Time in seconds between poll cycles (default: 0.01 = 10ms)
stop_on_empty
If True, stop when no clients remain (default: False)

Example

>>> import rtms
>>>
>>> clients = {}
>>>
>>> @rtms.onWebhookEvent
>>> def handle(payload):
>>>     client = rtms.Client()
>>>     clients[payload['payload']['rtms_stream_id']] = client
>>>     client.onTranscriptData(lambda d,s,t,m: print(m.userName, d))
>>>     client.join(payload['payload'])
>>>
>>> rtms.run()  # Blocks until interrupted
def stop()
Expand source code
def stop():
    """
    Signal the event loop to stop.

    Call this from another thread to gracefully stop the rtms.run() loop.
    """
    global _running
    _running = False
    _stop_event.set()
    log_info('rtms', 'Stop signal received')

Signal the event loop to stop.

Call this from another thread to gracefully stop the rtms.run() loop.

def uninitialize()
Expand source code
def uninitialize():
    """
    Uninitialize the RTMS SDK.

    Call this when you're done using the SDK to release resources.
    """
    try:
        _ClientBase.uninitialize()
        Client._sdk_initialized = False
        return True
    except Exception as e:
        log_error("rtms", f"Error uninitializing RTMS SDK: {e}")
        traceback.print_exc()
        return False

Uninitialize the RTMS SDK.

Call this when you're done using the SDK to release resources.

Classes

class AudioParams (...)

init(args, *kwargs) Overloaded function.

  1. init(self: rtms._rtms.AudioParams) -> None

  2. init(self: rtms._rtms.AudioParams, content_type: typing.SupportsInt, codec: typing.SupportsInt, sample_rate: typing.SupportsInt, channel: typing.SupportsInt, data_opt: typing.SupportsInt, duration: typing.SupportsInt, frame_size: typing.SupportsInt) -> None

Ancestors

  • pybind11_builtins.pybind11_object

Instance variables

prop channel
prop codec
prop contentType
prop dataOpt
prop duration
prop frameSize
prop sampleRate
class Client
Expand source code
class Client(_ClientBase):
    """
    RTMS Client - provides real-time media streaming capabilities

    This client allows you to join Zoom meetings and process audio, video,
    and transcript data in real-time.
    """

    _sdk_initialized = False

    def __init__(self):
        """Initialize a new RTMS client"""
        # Ensure SDK is initialized before creating client instance
        if not Client._sdk_initialized:
            try:
                ca_path = find_ca_certificate()
                log_debug("client", f"Initializing SDK with CA: {ca_path}")
                _ClientBase.initialize(ca_path, 1, "python-rtms")
                Client._sdk_initialized = True
                log_debug("client", "SDK initialized successfully")
            except Exception as e:
                log_error("client", f"SDK initialization failed: {e}")
                # Try with empty path as fallback
                try:
                    log_debug("client", "Trying SDK initialization with empty CA path")
                    _ClientBase.initialize("", 1, "python-rtms")
                    Client._sdk_initialized = True
                    log_debug("client", "SDK initialized with empty CA path")
                except Exception as e2:
                    log_error("client", f"SDK initialization failed completely: {e2}")
                    raise RuntimeError(f"Failed to initialize RTMS SDK: {e2}")

        super().__init__()
        self._polling_thread = None
        self._polling_interval = 10  # milliseconds
        self._running = False
        self._webhook_server = None

        # Register with global client registry
        with _clients_lock:
            _clients[id(self)] = self

    def join(self,
             meeting_uuid: str = None,
             session_id: str = None,
             rtms_stream_id: str = None,
             server_urls: str = None,
             signature: str = None,
             timeout: int = -1,
             ca: str = None,
             client: str = None,
             secret: str = None,
             poll_interval: int = 10,
             **kwargs):
        """
        Join a RTMS session.

        Can be called with positional arguments or with a dictionary of parameters.

        Args:
            meeting_uuid (str): Meeting UUID (for Meeting SDK events)
            session_id (str): Session ID (for Video SDK events) - used when meeting_uuid is not provided
            rtms_stream_id (str): RTMS stream ID
            server_urls (str): Server URLs (comma-separated)
            signature (str, optional): Authentication signature. If not provided, will be generated
            timeout (int, optional): Timeout in milliseconds. Defaults to -1 (no timeout).
            ca (str, optional): CA certificate path. Defaults to system CA files.
            client (str, optional): Client ID. If empty, uses ZM_RTMS_CLIENT env var.
            secret (str, optional): Client secret. If empty, uses ZM_RTMS_SECRET env var.
            poll_interval (int, optional): Polling interval in milliseconds. Defaults to 10.
            **kwargs: Additional arguments passed to join

        Returns:
            bool: True if joined successfully, False otherwise
        """

        try:
            # Support for both dictionary-style and parameter-style calls
            if len(kwargs) > 0:
                # If additional kwargs are provided, merge them with the named parameters
                params = {
                    'meeting_uuid': meeting_uuid,
                    'session_id': session_id,
                    'rtms_stream_id': rtms_stream_id,
                    'server_urls': server_urls,
                    'signature': signature,
                    'timeout': timeout,
                    'ca': ca,
                    'client': client,
                    'secret': secret,
                    'poll_interval': poll_interval
                }
                # Update with any additional kwargs
                params.update(kwargs)
                return self._join_with_params(**params)

            # Check if uuid is actually a dictionary (first param)
            if isinstance(meeting_uuid, dict):
                return self._join_with_params(**meeting_uuid)

            # Otherwise, use the parameters directly
            return self._join_with_params(
                meeting_uuid=meeting_uuid,
                session_id=session_id,
                rtms_stream_id=rtms_stream_id,
                server_urls=server_urls,
                signature=signature,
                timeout=timeout,
                ca=ca,
                client=client,
                secret=secret,
                poll_interval=poll_interval
            )
        except Exception as e:
            log_error("client", f"Error in join: {e}")
            traceback.print_exc()
            return False

    def _join_with_params(self, **params):
        """
        Internal method to join with parameter dictionary.

        IMPORTANT: Due to SDK threading constraints, the actual join() must be called
        from the same thread that runs the event loop. If rtms.run() hasn't been called,
        we proceed directly (backwards compatible). Otherwise, we queue for main thread.
        """
        # If rtms.run() hasn't been called yet, proceed directly (backwards compatible)
        if _main_thread_id is None:
            return self._do_join(**params)

        # If on main thread (where rtms.run() is executing), join directly
        if threading.get_ident() == _main_thread_id:
            return self._do_join(**params)

        # Queue the join for main thread execution
        log_debug("client", "Join called from non-main thread, queuing request")
        with _pending_lock:
            _pending_operations.append((self._do_join, (), params))
        log_debug("client", "Join request queued, will be processed by rtms.run()")
        return True  # Return immediately; actual join happens later

    def _do_join(self, **params):
        """Actually perform the join - always called on main thread or when no event loop"""
        try:
            # Extract parameters with defaults
            meeting_uuid = params.get('meeting_uuid')
            session_id = params.get('session_id')
            rtms_stream_id = params.get('rtms_stream_id')
            server_urls = params.get('server_urls')
            signature = params.get('signature')
            timeout = params.get('timeout', -1)
            ca = params.get('ca')
            client = params.get('client', os.getenv('ZM_RTMS_CLIENT'))
            secret = params.get('secret', os.getenv('ZM_RTMS_SECRET'))
            poll_interval = params.get('poll_interval', 10)

            # Use meeting_uuid for Meeting SDK events, session_id for Video SDK events
            instance_id = meeting_uuid or session_id

            if not instance_id:
                raise ValueError("Either meeting_uuid or session_id is required")
            if not rtms_stream_id:
                raise ValueError("RTMS Stream ID is required")
            if not server_urls:
                raise ValueError("Server URLs is required")

            # Generate signature if not provided
            if not signature:
                try:
                    signature = generate_signature(client, secret, instance_id, rtms_stream_id)
                except Exception as e:
                    log_error("client", f"Error generating signature: {e}")
                    raise

            # Store polling interval
            self._polling_interval = poll_interval

            # Join the meeting/session
            log_info("client", f"Joining {'meeting' if meeting_uuid else 'session'}: {instance_id}")
            super().join(instance_id, rtms_stream_id, signature, server_urls, timeout)

            # Start polling thread
            self._start_polling()

            log_info("client", "Successfully joined")
            return True
        except Exception as e:
            log_error("client", f"Error joining: {e}")
            traceback.print_exc()
            return False

    def _initialize_rtms(self, ca_path=None):
        """Initialize the RTMS SDK with the best available CA certificate"""
        try:
            # Find the best CA certificate
            ca_path = find_ca_certificate(ca_path)

            # Initialize the SDK
            log_debug("client", f"Initializing RTMS with CA: {ca_path}")
            _ClientBase.initialize(ca_path)
            return True
        except Exception as e:
            log_error("client", f"Error initializing RTMS: {e}")
            traceback.print_exc()
            # Try with an empty path as a last resort
            try:
                log_debug("client", "Trying initialization with empty CA path")
                _ClientBase.initialize("")
                return True
            except Exception as e2:
                log_error("client", f"Failed to initialize with empty CA path: {e2}")
                raise e  # Raise the original error

    def _poll_if_needed(self):
        """
        Poll the RTMS client if needed.

        IMPORTANT: Due to SDK threading constraints, poll() must be called from the
        main thread (same thread that initialized the SDK). This should be called
        periodically from the main loop.
        """
        if self._running:
            try:
                super().poll()
            except Exception as e:
                log_error("client", f"Error during polling: {e}")

    def _start_polling(self):
        """Mark that polling should begin (will be done from main thread)"""
        self._running = True
        log_debug("client", "Polling enabled - call _poll_if_needed() from main loop")

    def _stop_polling(self):
        """Stop polling"""
        self._running = False
        log_debug("client", "Polling stopped")

    def stop(self):
        """
        Stop RTMS client and release resources.

        This is equivalent to calling leave(), but with a more intuitive name.
        """
        return self.leave()

    def setAudioParams(self, params):
        """
        Set audio parameters with validation.

        Args:
            params (AudioParams): Audio parameters object

        Returns:
            bool: True if parameters were set successfully

        Raises:
            ValueError: If parameters are invalid
        """
        _validate_audio_params(params)
        return super().setAudioParams(params)

    def setVideoParams(self, params):
        """
        Set video parameters with validation.

        Args:
            params (VideoParams): Video parameters object

        Returns:
            bool: True if parameters were set successfully

        Raises:
            ValueError: If parameters are invalid
        """
        _validate_video_params(params)
        return super().setVideoParams(params)

    def setDeskshareParams(self, params):
        """
        Set deskshare parameters with validation.

        Args:
            params (DeskshareParams): Deskshare parameters object

        Returns:
            bool: True if parameters were set successfully

        Raises:
            ValueError: If parameters are invalid
        """
        _validate_deskshare_params(params)
        return super().setDeskshareParams(params)

    def subscribeEvent(self, events):
        """
        Subscribe to receive specific event types.

        Note: Calling onParticipantEvent() automatically subscribes to
        EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE events.

        Args:
            events: List of event type constants (e.g., [EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE])

        Returns:
            bool: True if subscription was successful
        """
        return super().subscribeEvent(events)

    def unsubscribeEvent(self, events):
        """
        Unsubscribe from specific event types.

        Args:
            events: List of event type constants

        Returns:
            bool: True if unsubscription was successful
        """
        return super().unsubscribeEvent(events)

    def onParticipantEvent(self, callback: Callable[[str, int, list], None]) -> bool:
        """
        Register a callback for participant join/leave events.

        This automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE.
        Events are delivered as parsed objects, not raw JSON.

        Args:
            callback: Function called with (event, timestamp, participants) where:
                - event: 'join' or 'leave'
                - timestamp: Unix timestamp in milliseconds
                - participants: List of dicts with 'user_id' and optional 'user_name'

        Returns:
            bool: True if registration succeeds

        Example:
            >>> def on_participant(event, timestamp, participants):
            ...     print(f"Participant {event}: {participants}")
            >>> client.onParticipantEvent(on_participant)
        """
        def event_handler(event_data: str):
            try:
                data = json.loads(event_data)
                event_type = data.get('event_type')

                if event_type == EVENT_PARTICIPANT_JOIN:
                    participants = [
                        {'user_id': p.get('user_id'), 'user_name': p.get('user_name')}
                        for p in data.get('participants', [])
                    ]
                    callback('join', data.get('timestamp', 0), participants)
                elif event_type == EVENT_PARTICIPANT_LEAVE:
                    participants = [
                        {'user_id': p.get('user_id'), 'user_name': p.get('user_name')}
                        for p in data.get('participants', [])
                    ]
                    callback('leave', data.get('timestamp', 0), participants)
            except Exception as e:
                log_error('client', f'Failed to parse participant event: {e}')

        super().onEventEx(event_handler)
        try:
            self.subscribeEvent([EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE])
        except Exception as e:
            log_warn('client', f'Failed to auto-subscribe to participant events: {e}')
        return True

    def onActiveSpeakerEvent(self, callback: Callable[[int, int, str], None]) -> bool:
        """
        Register a callback for active speaker change events.

        This automatically subscribes to EVENT_ACTIVE_SPEAKER_CHANGE.

        Args:
            callback: Function called with (timestamp, user_id, user_name)

        Returns:
            bool: True if registration succeeds

        Example:
            >>> def on_speaker(timestamp, user_id, user_name):
            ...     print(f"Active speaker: {user_name} ({user_id})")
            >>> client.onActiveSpeakerEvent(on_speaker)
        """
        def event_handler(event_data: str):
            try:
                data = json.loads(event_data)
                event_type = data.get('event_type')

                if event_type == EVENT_ACTIVE_SPEAKER_CHANGE:
                    callback(
                        data.get('timestamp', 0),
                        data.get('user_id', 0),
                        data.get('user_name', '')
                    )
            except Exception as e:
                log_error('client', f'Failed to parse active speaker event: {e}')

        super().onEventEx(event_handler)
        try:
            self.subscribeEvent([EVENT_ACTIVE_SPEAKER_CHANGE])
        except Exception as e:
            log_warn('client', f'Failed to auto-subscribe to active speaker events: {e}')
        return True

    def onSharingEvent(self, callback: Callable[[str, int, Optional[int], Optional[str]], None]) -> bool:
        """
        Register a callback for sharing start/stop events.

        This automatically subscribes to EVENT_SHARING_START and EVENT_SHARING_STOP.
        Note: These events only work when the RTMS app has DESKSHARE scope permission.

        Args:
            callback: Function called with (event, timestamp, user_id, user_name) where:
                - event: 'start' or 'stop'
                - timestamp: Unix timestamp in milliseconds
                - user_id: Optional user ID (only for 'start')
                - user_name: Optional user name (only for 'start')

        Returns:
            bool: True if registration succeeds

        Example:
            >>> def on_sharing(event, timestamp, user_id, user_name):
            ...     print(f"Sharing {event} by {user_name}")
            >>> client.onSharingEvent(on_sharing)
        """
        def event_handler(event_data: str):
            try:
                data = json.loads(event_data)
                event_type = data.get('event_type')

                if event_type == EVENT_SHARING_START:
                    callback(
                        'start',
                        data.get('timestamp', 0),
                        data.get('user_id'),
                        data.get('user_name')
                    )
                elif event_type == EVENT_SHARING_STOP:
                    callback(
                        'stop',
                        data.get('timestamp', 0),
                        None,
                        None
                    )
            except Exception as e:
                log_error('client', f'Failed to parse sharing event: {e}')

        super().onEventEx(event_handler)
        try:
            self.subscribeEvent([EVENT_SHARING_START, EVENT_SHARING_STOP])
        except Exception as e:
            log_warn('client', f'Failed to auto-subscribe to sharing events: {e}')
        return True

    def leave(self):
        """
        Leave the RTMS session and stop all threads.

        Returns:
            bool: True if left successfully
        """
        log_info("client", "Leaving RTMS session")

        # Stop polling thread
        self._stop_polling()

        # Unregister from global client registry
        with _clients_lock:
            _clients.pop(id(self), None)

        # Stop webhook server if we have one
        if self._webhook_server:
            self._webhook_server.stop()
            self._webhook_server = None

        try:
            # Release RTMS resources
            super().release()
            return True
        except Exception as e:
            log_error("client", f"Error releasing RTMS resources: {e}")
            traceback.print_exc()
            return False

    def on_webhook_event(self, callback=None, port=None, path=None):
        """
        Register a webhook event handler.

        This can be used as a decorator or a direct method call:

        @client.on_webhook_event(port=8080, path='/webhook')
        def handle_webhook(payload):
            print(f"Received webhook: {payload}")

        Args:
            callback (callable, optional): Function to call when a webhook is received
            port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
            path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'

        Returns:
            callable: Decorator function if used as a decorator
        """
        # If used as a decorator without arguments
        if callback is not None and callable(callback):
            # Start webhook server with provided callback
            self._start_webhook_server(callback)
            return callback

        # If used as a decorator with arguments or as a method call
        def decorator(func):
            self._start_webhook_server(func, port, path)
            return func

        return decorator

    def _start_webhook_server(self, callback, port=None, path=None):
        """
        Start the webhook server

        Args:
            callback (callable): Function to call when a webhook is received
            port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
            path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'
        """
        if not self._webhook_server:
            port = port or int(os.getenv('ZM_RTMS_PORT', '8080'))
            path = path or os.getenv('ZM_RTMS_PATH', '/')

            self._webhook_server = WebhookServer(port, path)

        self._webhook_server.start(callback)

RTMS Client - provides real-time media streaming capabilities

This client allows you to join Zoom meetings and process audio, video, and transcript data in real-time.

Initialize a new RTMS client

Ancestors

  • rtms._rtms.Client
  • pybind11_builtins.pybind11_object

Methods

def join(self,
meeting_uuid: str = None,
session_id: str = None,
rtms_stream_id: str = None,
server_urls: str = None,
signature: str = None,
timeout: int = -1,
ca: str = None,
client: str = None,
secret: str = None,
poll_interval: int = 10,
**kwargs)
Expand source code
def join(self,
         meeting_uuid: str = None,
         session_id: str = None,
         rtms_stream_id: str = None,
         server_urls: str = None,
         signature: str = None,
         timeout: int = -1,
         ca: str = None,
         client: str = None,
         secret: str = None,
         poll_interval: int = 10,
         **kwargs):
    """
    Join a RTMS session.

    Can be called with positional arguments or with a dictionary of parameters.

    Args:
        meeting_uuid (str): Meeting UUID (for Meeting SDK events)
        session_id (str): Session ID (for Video SDK events) - used when meeting_uuid is not provided
        rtms_stream_id (str): RTMS stream ID
        server_urls (str): Server URLs (comma-separated)
        signature (str, optional): Authentication signature. If not provided, will be generated
        timeout (int, optional): Timeout in milliseconds. Defaults to -1 (no timeout).
        ca (str, optional): CA certificate path. Defaults to system CA files.
        client (str, optional): Client ID. If empty, uses ZM_RTMS_CLIENT env var.
        secret (str, optional): Client secret. If empty, uses ZM_RTMS_SECRET env var.
        poll_interval (int, optional): Polling interval in milliseconds. Defaults to 10.
        **kwargs: Additional arguments passed to join

    Returns:
        bool: True if joined successfully, False otherwise
    """

    try:
        # Support for both dictionary-style and parameter-style calls
        if len(kwargs) > 0:
            # If additional kwargs are provided, merge them with the named parameters
            params = {
                'meeting_uuid': meeting_uuid,
                'session_id': session_id,
                'rtms_stream_id': rtms_stream_id,
                'server_urls': server_urls,
                'signature': signature,
                'timeout': timeout,
                'ca': ca,
                'client': client,
                'secret': secret,
                'poll_interval': poll_interval
            }
            # Update with any additional kwargs
            params.update(kwargs)
            return self._join_with_params(**params)

        # Check if uuid is actually a dictionary (first param)
        if isinstance(meeting_uuid, dict):
            return self._join_with_params(**meeting_uuid)

        # Otherwise, use the parameters directly
        return self._join_with_params(
            meeting_uuid=meeting_uuid,
            session_id=session_id,
            rtms_stream_id=rtms_stream_id,
            server_urls=server_urls,
            signature=signature,
            timeout=timeout,
            ca=ca,
            client=client,
            secret=secret,
            poll_interval=poll_interval
        )
    except Exception as e:
        log_error("client", f"Error in join: {e}")
        traceback.print_exc()
        return False

Join a RTMS session.

Can be called with positional arguments or with a dictionary of parameters.

Args

meeting_uuid : str
Meeting UUID (for Meeting SDK events)
session_id : str
Session ID (for Video SDK events) - used when meeting_uuid is not provided
rtms_stream_id : str
RTMS stream ID
server_urls : str
Server URLs (comma-separated)
signature : str, optional
Authentication signature. If not provided, will be generated
timeout : int, optional
Timeout in milliseconds. Defaults to -1 (no timeout).
ca : str, optional
CA certificate path. Defaults to system CA files.
client : str, optional
Client ID. If empty, uses ZM_RTMS_CLIENT env var.
secret : str, optional
Client secret. If empty, uses ZM_RTMS_SECRET env var.
poll_interval : int, optional
Polling interval in milliseconds. Defaults to 10.
**kwargs
Additional arguments passed to join

Returns

bool
True if joined successfully, False otherwise
def leave(self)
Expand source code
def leave(self):
    """
    Leave the RTMS session and stop all threads.

    Returns:
        bool: True if left successfully
    """
    log_info("client", "Leaving RTMS session")

    # Stop polling thread
    self._stop_polling()

    # Unregister from global client registry
    with _clients_lock:
        _clients.pop(id(self), None)

    # Stop webhook server if we have one
    if self._webhook_server:
        self._webhook_server.stop()
        self._webhook_server = None

    try:
        # Release RTMS resources
        super().release()
        return True
    except Exception as e:
        log_error("client", f"Error releasing RTMS resources: {e}")
        traceback.print_exc()
        return False

Leave the RTMS session and stop all threads.

Returns

bool
True if left successfully
def onActiveSpeakerEvent(self, callback: Callable[[int, int, str], None]) ‑> bool
Expand source code
def onActiveSpeakerEvent(self, callback: Callable[[int, int, str], None]) -> bool:
    """
    Register a callback for active speaker change events.

    This automatically subscribes to EVENT_ACTIVE_SPEAKER_CHANGE.

    Args:
        callback: Function called with (timestamp, user_id, user_name)

    Returns:
        bool: True if registration succeeds

    Example:
        >>> def on_speaker(timestamp, user_id, user_name):
        ...     print(f"Active speaker: {user_name} ({user_id})")
        >>> client.onActiveSpeakerEvent(on_speaker)
    """
    def event_handler(event_data: str):
        try:
            data = json.loads(event_data)
            event_type = data.get('event_type')

            if event_type == EVENT_ACTIVE_SPEAKER_CHANGE:
                callback(
                    data.get('timestamp', 0),
                    data.get('user_id', 0),
                    data.get('user_name', '')
                )
        except Exception as e:
            log_error('client', f'Failed to parse active speaker event: {e}')

    super().onEventEx(event_handler)
    try:
        self.subscribeEvent([EVENT_ACTIVE_SPEAKER_CHANGE])
    except Exception as e:
        log_warn('client', f'Failed to auto-subscribe to active speaker events: {e}')
    return True

Register a callback for active speaker change events.

This automatically subscribes to EVENT_ACTIVE_SPEAKER_CHANGE.

Args

callback
Function called with (timestamp, user_id, user_name)

Returns

bool
True if registration succeeds

Example

>>> def on_speaker(timestamp, user_id, user_name):
...     print(f"Active speaker: {user_name} ({user_id})")
>>> client.onActiveSpeakerEvent(on_speaker)
def onParticipantEvent(self, callback: Callable[[str, int, list], None]) ‑> bool
Expand source code
def onParticipantEvent(self, callback: Callable[[str, int, list], None]) -> bool:
    """
    Register a callback for participant join/leave events.

    This automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE.
    Events are delivered as parsed objects, not raw JSON.

    Args:
        callback: Function called with (event, timestamp, participants) where:
            - event: 'join' or 'leave'
            - timestamp: Unix timestamp in milliseconds
            - participants: List of dicts with 'user_id' and optional 'user_name'

    Returns:
        bool: True if registration succeeds

    Example:
        >>> def on_participant(event, timestamp, participants):
        ...     print(f"Participant {event}: {participants}")
        >>> client.onParticipantEvent(on_participant)
    """
    def event_handler(event_data: str):
        try:
            data = json.loads(event_data)
            event_type = data.get('event_type')

            if event_type == EVENT_PARTICIPANT_JOIN:
                participants = [
                    {'user_id': p.get('user_id'), 'user_name': p.get('user_name')}
                    for p in data.get('participants', [])
                ]
                callback('join', data.get('timestamp', 0), participants)
            elif event_type == EVENT_PARTICIPANT_LEAVE:
                participants = [
                    {'user_id': p.get('user_id'), 'user_name': p.get('user_name')}
                    for p in data.get('participants', [])
                ]
                callback('leave', data.get('timestamp', 0), participants)
        except Exception as e:
            log_error('client', f'Failed to parse participant event: {e}')

    super().onEventEx(event_handler)
    try:
        self.subscribeEvent([EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE])
    except Exception as e:
        log_warn('client', f'Failed to auto-subscribe to participant events: {e}')
    return True

Register a callback for participant join/leave events.

This automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE. Events are delivered as parsed objects, not raw JSON.

Args

callback
Function called with (event, timestamp, participants) where: - event: 'join' or 'leave' - timestamp: Unix timestamp in milliseconds - participants: List of dicts with 'user_id' and optional 'user_name'

Returns

bool
True if registration succeeds

Example

>>> def on_participant(event, timestamp, participants):
...     print(f"Participant {event}: {participants}")
>>> client.onParticipantEvent(on_participant)
def onSharingEvent(self, callback: Callable[[str, int, int | None, str | None], None]) ‑> bool
Expand source code
def onSharingEvent(self, callback: Callable[[str, int, Optional[int], Optional[str]], None]) -> bool:
    """
    Register a callback for sharing start/stop events.

    This automatically subscribes to EVENT_SHARING_START and EVENT_SHARING_STOP.
    Note: These events only work when the RTMS app has DESKSHARE scope permission.

    Args:
        callback: Function called with (event, timestamp, user_id, user_name) where:
            - event: 'start' or 'stop'
            - timestamp: Unix timestamp in milliseconds
            - user_id: Optional user ID (only for 'start')
            - user_name: Optional user name (only for 'start')

    Returns:
        bool: True if registration succeeds

    Example:
        >>> def on_sharing(event, timestamp, user_id, user_name):
        ...     print(f"Sharing {event} by {user_name}")
        >>> client.onSharingEvent(on_sharing)
    """
    def event_handler(event_data: str):
        try:
            data = json.loads(event_data)
            event_type = data.get('event_type')

            if event_type == EVENT_SHARING_START:
                callback(
                    'start',
                    data.get('timestamp', 0),
                    data.get('user_id'),
                    data.get('user_name')
                )
            elif event_type == EVENT_SHARING_STOP:
                callback(
                    'stop',
                    data.get('timestamp', 0),
                    None,
                    None
                )
        except Exception as e:
            log_error('client', f'Failed to parse sharing event: {e}')

    super().onEventEx(event_handler)
    try:
        self.subscribeEvent([EVENT_SHARING_START, EVENT_SHARING_STOP])
    except Exception as e:
        log_warn('client', f'Failed to auto-subscribe to sharing events: {e}')
    return True

Register a callback for sharing start/stop events.

This automatically subscribes to EVENT_SHARING_START and EVENT_SHARING_STOP. Note: These events only work when the RTMS app has DESKSHARE scope permission.

Args

callback
Function called with (event, timestamp, user_id, user_name) where: - event: 'start' or 'stop' - timestamp: Unix timestamp in milliseconds - user_id: Optional user ID (only for 'start') - user_name: Optional user name (only for 'start')

Returns

bool
True if registration succeeds

Example

>>> def on_sharing(event, timestamp, user_id, user_name):
...     print(f"Sharing {event} by {user_name}")
>>> client.onSharingEvent(on_sharing)
def on_webhook_event(self, callback=None, port=None, path=None)
Expand source code
def on_webhook_event(self, callback=None, port=None, path=None):
    """
    Register a webhook event handler.

    This can be used as a decorator or a direct method call:

    @client.on_webhook_event(port=8080, path='/webhook')
    def handle_webhook(payload):
        print(f"Received webhook: {payload}")

    Args:
        callback (callable, optional): Function to call when a webhook is received
        port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
        path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'

    Returns:
        callable: Decorator function if used as a decorator
    """
    # If used as a decorator without arguments
    if callback is not None and callable(callback):
        # Start webhook server with provided callback
        self._start_webhook_server(callback)
        return callback

    # If used as a decorator with arguments or as a method call
    def decorator(func):
        self._start_webhook_server(func, port, path)
        return func

    return decorator

Register a webhook event handler.

This can be used as a decorator or a direct method call:

@client.on_webhook_event(port=8080, path='/webhook') def handle_webhook(payload): print(f"Received webhook: {payload}")

Args

callback : callable, optional
Function to call when a webhook is received
port : int, optional
Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
path : str, optional
URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'

Returns

callable
Decorator function if used as a decorator
def setAudioParams(self, params)
Expand source code
def setAudioParams(self, params):
    """
    Set audio parameters with validation.

    Args:
        params (AudioParams): Audio parameters object

    Returns:
        bool: True if parameters were set successfully

    Raises:
        ValueError: If parameters are invalid
    """
    _validate_audio_params(params)
    return super().setAudioParams(params)

Set audio parameters with validation.

Args

params : AudioParams
Audio parameters object

Returns

bool
True if parameters were set successfully

Raises

ValueError
If parameters are invalid
def setDeskshareParams(self, params)
Expand source code
def setDeskshareParams(self, params):
    """
    Set deskshare parameters with validation.

    Args:
        params (DeskshareParams): Deskshare parameters object

    Returns:
        bool: True if parameters were set successfully

    Raises:
        ValueError: If parameters are invalid
    """
    _validate_deskshare_params(params)
    return super().setDeskshareParams(params)

Set deskshare parameters with validation.

Args

params : DeskshareParams
Deskshare parameters object

Returns

bool
True if parameters were set successfully

Raises

ValueError
If parameters are invalid
def setVideoParams(self, params)
Expand source code
def setVideoParams(self, params):
    """
    Set video parameters with validation.

    Args:
        params (VideoParams): Video parameters object

    Returns:
        bool: True if parameters were set successfully

    Raises:
        ValueError: If parameters are invalid
    """
    _validate_video_params(params)
    return super().setVideoParams(params)

Set video parameters with validation.

Args

params : VideoParams
Video parameters object

Returns

bool
True if parameters were set successfully

Raises

ValueError
If parameters are invalid
def stop(self)
Expand source code
def stop(self):
    """
    Stop RTMS client and release resources.

    This is equivalent to calling leave(), but with a more intuitive name.
    """
    return self.leave()

Stop RTMS client and release resources.

This is equivalent to calling leave(), but with a more intuitive name.

def subscribeEvent(self, events)
Expand source code
def subscribeEvent(self, events):
    """
    Subscribe to receive specific event types.

    Note: Calling onParticipantEvent() automatically subscribes to
    EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE events.

    Args:
        events: List of event type constants (e.g., [EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE])

    Returns:
        bool: True if subscription was successful
    """
    return super().subscribeEvent(events)

Subscribe to receive specific event types.

Note: Calling onParticipantEvent() automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE events.

Args

events
List of event type constants (e.g., [EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE])

Returns

bool
True if subscription was successful
def unsubscribeEvent(self, events)
Expand source code
def unsubscribeEvent(self, events):
    """
    Unsubscribe from specific event types.

    Args:
        events: List of event type constants

    Returns:
        bool: True if unsubscription was successful
    """
    return super().unsubscribeEvent(events)

Unsubscribe from specific event types.

Args

events
List of event type constants

Returns

bool
True if unsubscription was successful
class DeskshareParams (...)

init(args, *kwargs) Overloaded function.

  1. init(self: rtms._rtms.DeskshareParams) -> None

  2. init(self: rtms._rtms.DeskshareParams, content_type: typing.SupportsInt, codec: typing.SupportsInt, resolution: typing.SupportsInt, fps: typing.SupportsInt) -> None

Ancestors

  • pybind11_builtins.pybind11_object

Instance variables

prop codec
prop contentType
prop fps
prop resolution
class LogFormat (*args, **kwds)
Expand source code
class LogFormat(str, Enum):
    """Available log output formats"""
    PROGRESSIVE = 'progressive'
    JSON = 'json'

Available log output formats

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var JSON

The type of the None singleton.

var PROGRESSIVE

The type of the None singleton.

class LogLevel (*args, **kwds)
Expand source code
class LogLevel(IntEnum):
    """Available log levels for RTMS SDK logging"""
    ERROR = 0
    WARN = 1
    INFO = 2
    DEBUG = 3
    TRACE = 4

Available log levels for RTMS SDK logging

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var DEBUG

The type of the None singleton.

var ERROR

The type of the None singleton.

var INFO

The type of the None singleton.

var TRACE

The type of the None singleton.

var WARN

The type of the None singleton.

class Metadata (*args, **kwargs)

Ancestors

  • pybind11_builtins.pybind11_object

Instance variables

prop userId
prop userName
class Participant (*args, **kwargs)

Ancestors

  • pybind11_builtins.pybind11_object

Instance variables

prop id
prop name
class Session (*args, **kwargs)

Ancestors

  • pybind11_builtins.pybind11_object

Instance variables

prop isActive
prop isPaused
prop meetingId
prop sessionId
prop statTime
prop status
prop streamId
class VideoParams (...)

init(args, *kwargs) Overloaded function.

  1. init(self: rtms._rtms.VideoParams) -> None

  2. init(self: rtms._rtms.VideoParams, content_type: typing.SupportsInt, codec: typing.SupportsInt, resolution: typing.SupportsInt, data_opt: typing.SupportsInt, fps: typing.SupportsInt) -> None

Ancestors

  • pybind11_builtins.pybind11_object

Instance variables

prop codec
prop contentType
prop dataOpt
prop fps
prop resolution