Package rtms
Functions
def configure_logger(options: dict)-
Expand source code
def configure_logger(options: dict): """Configure the logger with the specified options""" global _log_level, _log_format, _log_enabled if 'level' in options: _log_level = options['level'] if 'format' in options: _log_format = options['format'] if 'enabled' in options: _log_enabled = options['enabled'] log_info('logger', f"Logger configured: level={_log_level}, format={_log_format}, enabled={_log_enabled}") log_debug("rtms", "Importing _rtms module")Configure the logger with the specified options
def generate_signature(client, secret, uuid, rtms_stream_id)-
Expand source code
def generate_signature(client, secret, uuid, rtms_stream_id): """Generate a signature for RTMS authentication""" client_id = os.getenv("ZM_RTMS_CLIENT", client) client_secret = os.getenv("ZM_RTMS_SECRET", secret) if not client_id: raise EnvironmentError("Client ID cannot be blank") elif not client_secret: raise EnvironmentError("Client Secret cannot be blank") message = f"{client_id},{uuid},{rtms_stream_id}" signature = hmac.new( client_secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256 ).hexdigest() return signatureGenerate a signature for RTMS authentication
def initialize(ca_path=None)-
Expand source code
def initialize(ca_path=None): """ Initialize the RTMS SDK. Note: The SDK is automatically initialized when you create a Client instance. You only need to call this if you want to initialize with a custom CA path. Args: ca_path (str, optional): Path to the CA certificate file. Returns: bool: True if initialization was successful """ ca_path = find_ca_certificate(ca_path) try: _ClientBase.initialize(ca_path) Client._sdk_initialized = True return True except Exception as e: log_error("rtms", f"Error initializing RTMS SDK: {e}") return FalseInitialize the RTMS SDK.
Note: The SDK is automatically initialized when you create a Client instance. You only need to call this if you want to initialize with a custom CA path.
Args
ca_path:str, optional- Path to the CA certificate file.
Returns
bool- True if initialization was successful
def log_debug(component, message, details=None)-
Expand source code
def log_debug(component, message, details=None): """Log a debug message""" _log(LogLevel.DEBUG, component, message, details)Log a debug message
def log_error(component, message, details=None)-
Expand source code
def log_error(component, message, details=None): """Log an error message""" _log(LogLevel.ERROR, component, message, details)Log an error message
def log_info(component, message, details=None)-
Expand source code
def log_info(component, message, details=None): """Log an info message""" _log(LogLevel.INFO, component, message, details)Log an info message
def log_warn(component, message, details=None)-
Expand source code
def log_warn(component, message, details=None): """Log a warning message""" _log(LogLevel.WARN, component, message, details)Log a warning message
def onWebhookEvent(callback=None, port=None, path=None)-
Expand source code
def onWebhookEvent(callback=None, port=None, path=None): """ Start a webhook server to receive events from Zoom. This function creates an HTTP server that listens for webhook events from Zoom. When a webhook event is received, it parses the JSON payload and passes it to the provided callback function. Can be used as a decorator or a direct function call: @rtms.onWebhookEvent(port=8080, path='/webhook') def handle_webhook(payload): if payload.get('event') == 'meeting.rtms.started': # Create a client and join client = rtms.Client() client.join(...) Args: callback (callable, optional): Function to call when a webhook is received port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080 path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/' Returns: callable: Decorator function if used as a decorator """ global _webhook_server # Determine port and path webhook_port = port or int(os.getenv('ZM_RTMS_PORT', '8080')) webhook_path = path or os.getenv('ZM_RTMS_PATH', '/') # If used as a decorator without arguments if callback is not None and callable(callback): if _webhook_server is None: _webhook_server = WebhookServer(webhook_port, webhook_path) _webhook_server.start(callback) return callback # If used as a decorator with arguments or as a method call def decorator(func): global _webhook_server if _webhook_server is None: _webhook_server = WebhookServer(webhook_port, webhook_path) _webhook_server.start(func) return func return decoratorStart a webhook server to receive events from Zoom.
This function creates an HTTP server that listens for webhook events from Zoom. When a webhook event is received, it parses the JSON payload and passes it to the provided callback function.
Can be used as a decorator or a direct function call:
@rtms.onWebhookEvent(port=8080, path='/webhook') def handle_webhook(payload): if payload.get('event') == 'meeting.rtms.started': # Create a client and join client = rtms.Client() client.join(…)
Args
callback:callable, optional- Function to call when a webhook is received
port:int, optional- Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
path:str, optional- URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'
Returns
callable- Decorator function if used as a decorator
def on_webhook_event(callback=None, port=None, path=None)-
Expand source code
def onWebhookEvent(callback=None, port=None, path=None): """ Start a webhook server to receive events from Zoom. This function creates an HTTP server that listens for webhook events from Zoom. When a webhook event is received, it parses the JSON payload and passes it to the provided callback function. Can be used as a decorator or a direct function call: @rtms.onWebhookEvent(port=8080, path='/webhook') def handle_webhook(payload): if payload.get('event') == 'meeting.rtms.started': # Create a client and join client = rtms.Client() client.join(...) Args: callback (callable, optional): Function to call when a webhook is received port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080 path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/' Returns: callable: Decorator function if used as a decorator """ global _webhook_server # Determine port and path webhook_port = port or int(os.getenv('ZM_RTMS_PORT', '8080')) webhook_path = path or os.getenv('ZM_RTMS_PATH', '/') # If used as a decorator without arguments if callback is not None and callable(callback): if _webhook_server is None: _webhook_server = WebhookServer(webhook_port, webhook_path) _webhook_server.start(callback) return callback # If used as a decorator with arguments or as a method call def decorator(func): global _webhook_server if _webhook_server is None: _webhook_server = WebhookServer(webhook_port, webhook_path) _webhook_server.start(func) return func return decoratorStart a webhook server to receive events from Zoom.
This function creates an HTTP server that listens for webhook events from Zoom. When a webhook event is received, it parses the JSON payload and passes it to the provided callback function.
Can be used as a decorator or a direct function call:
@rtms.onWebhookEvent(port=8080, path='/webhook') def handle_webhook(payload): if payload.get('event') == 'meeting.rtms.started': # Create a client and join client = rtms.Client() client.join(…)
Args
callback:callable, optional- Function to call when a webhook is received
port:int, optional- Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
path:str, optional- URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'
Returns
callable- Decorator function if used as a decorator
def run(poll_interval: float = 0.01, stop_on_empty: bool = False)-
Expand source code
def run(poll_interval: float = 0.01, stop_on_empty: bool = False): """ Start the RTMS event loop. This function blocks and handles: - Polling all active clients - Processing pending operations from other threads (like webhook handlers) - Graceful shutdown on KeyboardInterrupt Args: poll_interval: Time in seconds between poll cycles (default: 0.01 = 10ms) stop_on_empty: If True, stop when no clients remain (default: False) Example: >>> import rtms >>> >>> clients = {} >>> >>> @rtms.onWebhookEvent >>> def handle(payload): >>> client = rtms.Client() >>> clients[payload['payload']['rtms_stream_id']] = client >>> client.onTranscriptData(lambda d,s,t,m: print(m.userName, d)) >>> client.join(payload['payload']) >>> >>> rtms.run() # Blocks until interrupted """ global _main_thread_id, _running _main_thread_id = threading.get_ident() _running = True _stop_event.clear() log_info('rtms', f'Starting RTMS event loop (poll_interval={poll_interval}s)') try: while _running and not _stop_event.is_set(): # Process pending operations from other threads _process_pending_operations() # Poll all active clients with _clients_lock: clients_to_poll = list(_clients.values()) for client in clients_to_poll: if client._running: try: client.poll() except Exception as e: log_error('rtms', f'Error polling client: {e}') # Check stop_on_empty condition if stop_on_empty and not clients_to_poll: log_info('rtms', 'No active clients, stopping event loop') break time.sleep(poll_interval) except KeyboardInterrupt: log_info('rtms', 'Received interrupt, shutting down...') finally: _running = False _main_thread_id = None _cleanup_all_clients()Start the RTMS event loop.
This function blocks and handles: - Polling all active clients - Processing pending operations from other threads (like webhook handlers) - Graceful shutdown on KeyboardInterrupt
Args
poll_interval- Time in seconds between poll cycles (default: 0.01 = 10ms)
stop_on_empty- If True, stop when no clients remain (default: False)
Example
>>> import rtms >>> >>> clients = {} >>> >>> @rtms.onWebhookEvent >>> def handle(payload): >>> client = rtms.Client() >>> clients[payload['payload']['rtms_stream_id']] = client >>> client.onTranscriptData(lambda d,s,t,m: print(m.userName, d)) >>> client.join(payload['payload']) >>> >>> rtms.run() # Blocks until interrupted def stop()-
Expand source code
def stop(): """ Signal the event loop to stop. Call this from another thread to gracefully stop the rtms.run() loop. """ global _running _running = False _stop_event.set() log_info('rtms', 'Stop signal received')Signal the event loop to stop.
Call this from another thread to gracefully stop the rtms.run() loop.
def uninitialize()-
Expand source code
def uninitialize(): """ Uninitialize the RTMS SDK. Call this when you're done using the SDK to release resources. """ try: _ClientBase.uninitialize() Client._sdk_initialized = False return True except Exception as e: log_error("rtms", f"Error uninitializing RTMS SDK: {e}") traceback.print_exc() return FalseUninitialize the RTMS SDK.
Call this when you're done using the SDK to release resources.
Classes
class AudioParams (...)-
init(args, *kwargs) Overloaded function.
-
init(self: rtms._rtms.AudioParams) -> None
-
init(self: rtms._rtms.AudioParams, content_type: typing.SupportsInt, codec: typing.SupportsInt, sample_rate: typing.SupportsInt, channel: typing.SupportsInt, data_opt: typing.SupportsInt, duration: typing.SupportsInt, frame_size: typing.SupportsInt) -> None
Ancestors
- pybind11_builtins.pybind11_object
Instance variables
prop channelprop codecprop contentTypeprop dataOptprop durationprop frameSizeprop sampleRate
-
class Client-
Expand source code
class Client(_ClientBase): """ RTMS Client - provides real-time media streaming capabilities This client allows you to join Zoom meetings and process audio, video, and transcript data in real-time. """ _sdk_initialized = False def __init__(self): """Initialize a new RTMS client""" # Ensure SDK is initialized before creating client instance if not Client._sdk_initialized: try: ca_path = find_ca_certificate() log_debug("client", f"Initializing SDK with CA: {ca_path}") _ClientBase.initialize(ca_path, 1, "python-rtms") Client._sdk_initialized = True log_debug("client", "SDK initialized successfully") except Exception as e: log_error("client", f"SDK initialization failed: {e}") # Try with empty path as fallback try: log_debug("client", "Trying SDK initialization with empty CA path") _ClientBase.initialize("", 1, "python-rtms") Client._sdk_initialized = True log_debug("client", "SDK initialized with empty CA path") except Exception as e2: log_error("client", f"SDK initialization failed completely: {e2}") raise RuntimeError(f"Failed to initialize RTMS SDK: {e2}") super().__init__() self._polling_thread = None self._polling_interval = 10 # milliseconds self._running = False self._webhook_server = None # Register with global client registry with _clients_lock: _clients[id(self)] = self def join(self, meeting_uuid: str = None, session_id: str = None, rtms_stream_id: str = None, server_urls: str = None, signature: str = None, timeout: int = -1, ca: str = None, client: str = None, secret: str = None, poll_interval: int = 10, **kwargs): """ Join a RTMS session. Can be called with positional arguments or with a dictionary of parameters. Args: meeting_uuid (str): Meeting UUID (for Meeting SDK events) session_id (str): Session ID (for Video SDK events) - used when meeting_uuid is not provided rtms_stream_id (str): RTMS stream ID server_urls (str): Server URLs (comma-separated) signature (str, optional): Authentication signature. If not provided, will be generated timeout (int, optional): Timeout in milliseconds. Defaults to -1 (no timeout). ca (str, optional): CA certificate path. Defaults to system CA files. client (str, optional): Client ID. If empty, uses ZM_RTMS_CLIENT env var. secret (str, optional): Client secret. If empty, uses ZM_RTMS_SECRET env var. poll_interval (int, optional): Polling interval in milliseconds. Defaults to 10. **kwargs: Additional arguments passed to join Returns: bool: True if joined successfully, False otherwise """ try: # Support for both dictionary-style and parameter-style calls if len(kwargs) > 0: # If additional kwargs are provided, merge them with the named parameters params = { 'meeting_uuid': meeting_uuid, 'session_id': session_id, 'rtms_stream_id': rtms_stream_id, 'server_urls': server_urls, 'signature': signature, 'timeout': timeout, 'ca': ca, 'client': client, 'secret': secret, 'poll_interval': poll_interval } # Update with any additional kwargs params.update(kwargs) return self._join_with_params(**params) # Check if uuid is actually a dictionary (first param) if isinstance(meeting_uuid, dict): return self._join_with_params(**meeting_uuid) # Otherwise, use the parameters directly return self._join_with_params( meeting_uuid=meeting_uuid, session_id=session_id, rtms_stream_id=rtms_stream_id, server_urls=server_urls, signature=signature, timeout=timeout, ca=ca, client=client, secret=secret, poll_interval=poll_interval ) except Exception as e: log_error("client", f"Error in join: {e}") traceback.print_exc() return False def _join_with_params(self, **params): """ Internal method to join with parameter dictionary. IMPORTANT: Due to SDK threading constraints, the actual join() must be called from the same thread that runs the event loop. If rtms.run() hasn't been called, we proceed directly (backwards compatible). Otherwise, we queue for main thread. """ # If rtms.run() hasn't been called yet, proceed directly (backwards compatible) if _main_thread_id is None: return self._do_join(**params) # If on main thread (where rtms.run() is executing), join directly if threading.get_ident() == _main_thread_id: return self._do_join(**params) # Queue the join for main thread execution log_debug("client", "Join called from non-main thread, queuing request") with _pending_lock: _pending_operations.append((self._do_join, (), params)) log_debug("client", "Join request queued, will be processed by rtms.run()") return True # Return immediately; actual join happens later def _do_join(self, **params): """Actually perform the join - always called on main thread or when no event loop""" try: # Extract parameters with defaults meeting_uuid = params.get('meeting_uuid') session_id = params.get('session_id') rtms_stream_id = params.get('rtms_stream_id') server_urls = params.get('server_urls') signature = params.get('signature') timeout = params.get('timeout', -1) ca = params.get('ca') client = params.get('client', os.getenv('ZM_RTMS_CLIENT')) secret = params.get('secret', os.getenv('ZM_RTMS_SECRET')) poll_interval = params.get('poll_interval', 10) # Use meeting_uuid for Meeting SDK events, session_id for Video SDK events instance_id = meeting_uuid or session_id if not instance_id: raise ValueError("Either meeting_uuid or session_id is required") if not rtms_stream_id: raise ValueError("RTMS Stream ID is required") if not server_urls: raise ValueError("Server URLs is required") # Generate signature if not provided if not signature: try: signature = generate_signature(client, secret, instance_id, rtms_stream_id) except Exception as e: log_error("client", f"Error generating signature: {e}") raise # Store polling interval self._polling_interval = poll_interval # Join the meeting/session log_info("client", f"Joining {'meeting' if meeting_uuid else 'session'}: {instance_id}") super().join(instance_id, rtms_stream_id, signature, server_urls, timeout) # Start polling thread self._start_polling() log_info("client", "Successfully joined") return True except Exception as e: log_error("client", f"Error joining: {e}") traceback.print_exc() return False def _initialize_rtms(self, ca_path=None): """Initialize the RTMS SDK with the best available CA certificate""" try: # Find the best CA certificate ca_path = find_ca_certificate(ca_path) # Initialize the SDK log_debug("client", f"Initializing RTMS with CA: {ca_path}") _ClientBase.initialize(ca_path) return True except Exception as e: log_error("client", f"Error initializing RTMS: {e}") traceback.print_exc() # Try with an empty path as a last resort try: log_debug("client", "Trying initialization with empty CA path") _ClientBase.initialize("") return True except Exception as e2: log_error("client", f"Failed to initialize with empty CA path: {e2}") raise e # Raise the original error def _poll_if_needed(self): """ Poll the RTMS client if needed. IMPORTANT: Due to SDK threading constraints, poll() must be called from the main thread (same thread that initialized the SDK). This should be called periodically from the main loop. """ if self._running: try: super().poll() except Exception as e: log_error("client", f"Error during polling: {e}") def _start_polling(self): """Mark that polling should begin (will be done from main thread)""" self._running = True log_debug("client", "Polling enabled - call _poll_if_needed() from main loop") def _stop_polling(self): """Stop polling""" self._running = False log_debug("client", "Polling stopped") def stop(self): """ Stop RTMS client and release resources. This is equivalent to calling leave(), but with a more intuitive name. """ return self.leave() def setAudioParams(self, params): """ Set audio parameters with validation. Args: params (AudioParams): Audio parameters object Returns: bool: True if parameters were set successfully Raises: ValueError: If parameters are invalid """ _validate_audio_params(params) return super().setAudioParams(params) def setVideoParams(self, params): """ Set video parameters with validation. Args: params (VideoParams): Video parameters object Returns: bool: True if parameters were set successfully Raises: ValueError: If parameters are invalid """ _validate_video_params(params) return super().setVideoParams(params) def setDeskshareParams(self, params): """ Set deskshare parameters with validation. Args: params (DeskshareParams): Deskshare parameters object Returns: bool: True if parameters were set successfully Raises: ValueError: If parameters are invalid """ _validate_deskshare_params(params) return super().setDeskshareParams(params) def subscribeEvent(self, events): """ Subscribe to receive specific event types. Note: Calling onParticipantEvent() automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE events. Args: events: List of event type constants (e.g., [EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE]) Returns: bool: True if subscription was successful """ return super().subscribeEvent(events) def unsubscribeEvent(self, events): """ Unsubscribe from specific event types. Args: events: List of event type constants Returns: bool: True if unsubscription was successful """ return super().unsubscribeEvent(events) def onParticipantEvent(self, callback: Callable[[str, int, list], None]) -> bool: """ Register a callback for participant join/leave events. This automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE. Events are delivered as parsed objects, not raw JSON. Args: callback: Function called with (event, timestamp, participants) where: - event: 'join' or 'leave' - timestamp: Unix timestamp in milliseconds - participants: List of dicts with 'user_id' and optional 'user_name' Returns: bool: True if registration succeeds Example: >>> def on_participant(event, timestamp, participants): ... print(f"Participant {event}: {participants}") >>> client.onParticipantEvent(on_participant) """ def event_handler(event_data: str): try: data = json.loads(event_data) event_type = data.get('event_type') if event_type == EVENT_PARTICIPANT_JOIN: participants = [ {'user_id': p.get('user_id'), 'user_name': p.get('user_name')} for p in data.get('participants', []) ] callback('join', data.get('timestamp', 0), participants) elif event_type == EVENT_PARTICIPANT_LEAVE: participants = [ {'user_id': p.get('user_id'), 'user_name': p.get('user_name')} for p in data.get('participants', []) ] callback('leave', data.get('timestamp', 0), participants) except Exception as e: log_error('client', f'Failed to parse participant event: {e}') super().onEventEx(event_handler) try: self.subscribeEvent([EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to participant events: {e}') return True def onActiveSpeakerEvent(self, callback: Callable[[int, int, str], None]) -> bool: """ Register a callback for active speaker change events. This automatically subscribes to EVENT_ACTIVE_SPEAKER_CHANGE. Args: callback: Function called with (timestamp, user_id, user_name) Returns: bool: True if registration succeeds Example: >>> def on_speaker(timestamp, user_id, user_name): ... print(f"Active speaker: {user_name} ({user_id})") >>> client.onActiveSpeakerEvent(on_speaker) """ def event_handler(event_data: str): try: data = json.loads(event_data) event_type = data.get('event_type') if event_type == EVENT_ACTIVE_SPEAKER_CHANGE: callback( data.get('timestamp', 0), data.get('user_id', 0), data.get('user_name', '') ) except Exception as e: log_error('client', f'Failed to parse active speaker event: {e}') super().onEventEx(event_handler) try: self.subscribeEvent([EVENT_ACTIVE_SPEAKER_CHANGE]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to active speaker events: {e}') return True def onSharingEvent(self, callback: Callable[[str, int, Optional[int], Optional[str]], None]) -> bool: """ Register a callback for sharing start/stop events. This automatically subscribes to EVENT_SHARING_START and EVENT_SHARING_STOP. Note: These events only work when the RTMS app has DESKSHARE scope permission. Args: callback: Function called with (event, timestamp, user_id, user_name) where: - event: 'start' or 'stop' - timestamp: Unix timestamp in milliseconds - user_id: Optional user ID (only for 'start') - user_name: Optional user name (only for 'start') Returns: bool: True if registration succeeds Example: >>> def on_sharing(event, timestamp, user_id, user_name): ... print(f"Sharing {event} by {user_name}") >>> client.onSharingEvent(on_sharing) """ def event_handler(event_data: str): try: data = json.loads(event_data) event_type = data.get('event_type') if event_type == EVENT_SHARING_START: callback( 'start', data.get('timestamp', 0), data.get('user_id'), data.get('user_name') ) elif event_type == EVENT_SHARING_STOP: callback( 'stop', data.get('timestamp', 0), None, None ) except Exception as e: log_error('client', f'Failed to parse sharing event: {e}') super().onEventEx(event_handler) try: self.subscribeEvent([EVENT_SHARING_START, EVENT_SHARING_STOP]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to sharing events: {e}') return True def leave(self): """ Leave the RTMS session and stop all threads. Returns: bool: True if left successfully """ log_info("client", "Leaving RTMS session") # Stop polling thread self._stop_polling() # Unregister from global client registry with _clients_lock: _clients.pop(id(self), None) # Stop webhook server if we have one if self._webhook_server: self._webhook_server.stop() self._webhook_server = None try: # Release RTMS resources super().release() return True except Exception as e: log_error("client", f"Error releasing RTMS resources: {e}") traceback.print_exc() return False def on_webhook_event(self, callback=None, port=None, path=None): """ Register a webhook event handler. This can be used as a decorator or a direct method call: @client.on_webhook_event(port=8080, path='/webhook') def handle_webhook(payload): print(f"Received webhook: {payload}") Args: callback (callable, optional): Function to call when a webhook is received port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080 path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/' Returns: callable: Decorator function if used as a decorator """ # If used as a decorator without arguments if callback is not None and callable(callback): # Start webhook server with provided callback self._start_webhook_server(callback) return callback # If used as a decorator with arguments or as a method call def decorator(func): self._start_webhook_server(func, port, path) return func return decorator def _start_webhook_server(self, callback, port=None, path=None): """ Start the webhook server Args: callback (callable): Function to call when a webhook is received port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080 path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/' """ if not self._webhook_server: port = port or int(os.getenv('ZM_RTMS_PORT', '8080')) path = path or os.getenv('ZM_RTMS_PATH', '/') self._webhook_server = WebhookServer(port, path) self._webhook_server.start(callback)RTMS Client - provides real-time media streaming capabilities
This client allows you to join Zoom meetings and process audio, video, and transcript data in real-time.
Initialize a new RTMS client
Ancestors
- rtms._rtms.Client
- pybind11_builtins.pybind11_object
Methods
def join(self,
meeting_uuid: str = None,
session_id: str = None,
rtms_stream_id: str = None,
server_urls: str = None,
signature: str = None,
timeout: int = -1,
ca: str = None,
client: str = None,
secret: str = None,
poll_interval: int = 10,
**kwargs)-
Expand source code
def join(self, meeting_uuid: str = None, session_id: str = None, rtms_stream_id: str = None, server_urls: str = None, signature: str = None, timeout: int = -1, ca: str = None, client: str = None, secret: str = None, poll_interval: int = 10, **kwargs): """ Join a RTMS session. Can be called with positional arguments or with a dictionary of parameters. Args: meeting_uuid (str): Meeting UUID (for Meeting SDK events) session_id (str): Session ID (for Video SDK events) - used when meeting_uuid is not provided rtms_stream_id (str): RTMS stream ID server_urls (str): Server URLs (comma-separated) signature (str, optional): Authentication signature. If not provided, will be generated timeout (int, optional): Timeout in milliseconds. Defaults to -1 (no timeout). ca (str, optional): CA certificate path. Defaults to system CA files. client (str, optional): Client ID. If empty, uses ZM_RTMS_CLIENT env var. secret (str, optional): Client secret. If empty, uses ZM_RTMS_SECRET env var. poll_interval (int, optional): Polling interval in milliseconds. Defaults to 10. **kwargs: Additional arguments passed to join Returns: bool: True if joined successfully, False otherwise """ try: # Support for both dictionary-style and parameter-style calls if len(kwargs) > 0: # If additional kwargs are provided, merge them with the named parameters params = { 'meeting_uuid': meeting_uuid, 'session_id': session_id, 'rtms_stream_id': rtms_stream_id, 'server_urls': server_urls, 'signature': signature, 'timeout': timeout, 'ca': ca, 'client': client, 'secret': secret, 'poll_interval': poll_interval } # Update with any additional kwargs params.update(kwargs) return self._join_with_params(**params) # Check if uuid is actually a dictionary (first param) if isinstance(meeting_uuid, dict): return self._join_with_params(**meeting_uuid) # Otherwise, use the parameters directly return self._join_with_params( meeting_uuid=meeting_uuid, session_id=session_id, rtms_stream_id=rtms_stream_id, server_urls=server_urls, signature=signature, timeout=timeout, ca=ca, client=client, secret=secret, poll_interval=poll_interval ) except Exception as e: log_error("client", f"Error in join: {e}") traceback.print_exc() return FalseJoin a RTMS session.
Can be called with positional arguments or with a dictionary of parameters.
Args
meeting_uuid:str- Meeting UUID (for Meeting SDK events)
session_id:str- Session ID (for Video SDK events) - used when meeting_uuid is not provided
rtms_stream_id:str- RTMS stream ID
server_urls:str- Server URLs (comma-separated)
signature:str, optional- Authentication signature. If not provided, will be generated
timeout:int, optional- Timeout in milliseconds. Defaults to -1 (no timeout).
ca:str, optional- CA certificate path. Defaults to system CA files.
client:str, optional- Client ID. If empty, uses ZM_RTMS_CLIENT env var.
secret:str, optional- Client secret. If empty, uses ZM_RTMS_SECRET env var.
poll_interval:int, optional- Polling interval in milliseconds. Defaults to 10.
**kwargs- Additional arguments passed to join
Returns
bool- True if joined successfully, False otherwise
def leave(self)-
Expand source code
def leave(self): """ Leave the RTMS session and stop all threads. Returns: bool: True if left successfully """ log_info("client", "Leaving RTMS session") # Stop polling thread self._stop_polling() # Unregister from global client registry with _clients_lock: _clients.pop(id(self), None) # Stop webhook server if we have one if self._webhook_server: self._webhook_server.stop() self._webhook_server = None try: # Release RTMS resources super().release() return True except Exception as e: log_error("client", f"Error releasing RTMS resources: {e}") traceback.print_exc() return FalseLeave the RTMS session and stop all threads.
Returns
bool- True if left successfully
def onActiveSpeakerEvent(self, callback: Callable[[int, int, str], None]) ‑> bool-
Expand source code
def onActiveSpeakerEvent(self, callback: Callable[[int, int, str], None]) -> bool: """ Register a callback for active speaker change events. This automatically subscribes to EVENT_ACTIVE_SPEAKER_CHANGE. Args: callback: Function called with (timestamp, user_id, user_name) Returns: bool: True if registration succeeds Example: >>> def on_speaker(timestamp, user_id, user_name): ... print(f"Active speaker: {user_name} ({user_id})") >>> client.onActiveSpeakerEvent(on_speaker) """ def event_handler(event_data: str): try: data = json.loads(event_data) event_type = data.get('event_type') if event_type == EVENT_ACTIVE_SPEAKER_CHANGE: callback( data.get('timestamp', 0), data.get('user_id', 0), data.get('user_name', '') ) except Exception as e: log_error('client', f'Failed to parse active speaker event: {e}') super().onEventEx(event_handler) try: self.subscribeEvent([EVENT_ACTIVE_SPEAKER_CHANGE]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to active speaker events: {e}') return TrueRegister a callback for active speaker change events.
This automatically subscribes to EVENT_ACTIVE_SPEAKER_CHANGE.
Args
callback- Function called with (timestamp, user_id, user_name)
Returns
bool- True if registration succeeds
Example
>>> def on_speaker(timestamp, user_id, user_name): ... print(f"Active speaker: {user_name} ({user_id})") >>> client.onActiveSpeakerEvent(on_speaker) def onParticipantEvent(self, callback: Callable[[str, int, list], None]) ‑> bool-
Expand source code
def onParticipantEvent(self, callback: Callable[[str, int, list], None]) -> bool: """ Register a callback for participant join/leave events. This automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE. Events are delivered as parsed objects, not raw JSON. Args: callback: Function called with (event, timestamp, participants) where: - event: 'join' or 'leave' - timestamp: Unix timestamp in milliseconds - participants: List of dicts with 'user_id' and optional 'user_name' Returns: bool: True if registration succeeds Example: >>> def on_participant(event, timestamp, participants): ... print(f"Participant {event}: {participants}") >>> client.onParticipantEvent(on_participant) """ def event_handler(event_data: str): try: data = json.loads(event_data) event_type = data.get('event_type') if event_type == EVENT_PARTICIPANT_JOIN: participants = [ {'user_id': p.get('user_id'), 'user_name': p.get('user_name')} for p in data.get('participants', []) ] callback('join', data.get('timestamp', 0), participants) elif event_type == EVENT_PARTICIPANT_LEAVE: participants = [ {'user_id': p.get('user_id'), 'user_name': p.get('user_name')} for p in data.get('participants', []) ] callback('leave', data.get('timestamp', 0), participants) except Exception as e: log_error('client', f'Failed to parse participant event: {e}') super().onEventEx(event_handler) try: self.subscribeEvent([EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to participant events: {e}') return TrueRegister a callback for participant join/leave events.
This automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE. Events are delivered as parsed objects, not raw JSON.
Args
callback- Function called with (event, timestamp, participants) where: - event: 'join' or 'leave' - timestamp: Unix timestamp in milliseconds - participants: List of dicts with 'user_id' and optional 'user_name'
Returns
bool- True if registration succeeds
Example
>>> def on_participant(event, timestamp, participants): ... print(f"Participant {event}: {participants}") >>> client.onParticipantEvent(on_participant) def onSharingEvent(self, callback: Callable[[str, int, int | None, str | None], None]) ‑> bool-
Expand source code
def onSharingEvent(self, callback: Callable[[str, int, Optional[int], Optional[str]], None]) -> bool: """ Register a callback for sharing start/stop events. This automatically subscribes to EVENT_SHARING_START and EVENT_SHARING_STOP. Note: These events only work when the RTMS app has DESKSHARE scope permission. Args: callback: Function called with (event, timestamp, user_id, user_name) where: - event: 'start' or 'stop' - timestamp: Unix timestamp in milliseconds - user_id: Optional user ID (only for 'start') - user_name: Optional user name (only for 'start') Returns: bool: True if registration succeeds Example: >>> def on_sharing(event, timestamp, user_id, user_name): ... print(f"Sharing {event} by {user_name}") >>> client.onSharingEvent(on_sharing) """ def event_handler(event_data: str): try: data = json.loads(event_data) event_type = data.get('event_type') if event_type == EVENT_SHARING_START: callback( 'start', data.get('timestamp', 0), data.get('user_id'), data.get('user_name') ) elif event_type == EVENT_SHARING_STOP: callback( 'stop', data.get('timestamp', 0), None, None ) except Exception as e: log_error('client', f'Failed to parse sharing event: {e}') super().onEventEx(event_handler) try: self.subscribeEvent([EVENT_SHARING_START, EVENT_SHARING_STOP]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to sharing events: {e}') return TrueRegister a callback for sharing start/stop events.
This automatically subscribes to EVENT_SHARING_START and EVENT_SHARING_STOP. Note: These events only work when the RTMS app has DESKSHARE scope permission.
Args
callback- Function called with (event, timestamp, user_id, user_name) where: - event: 'start' or 'stop' - timestamp: Unix timestamp in milliseconds - user_id: Optional user ID (only for 'start') - user_name: Optional user name (only for 'start')
Returns
bool- True if registration succeeds
Example
>>> def on_sharing(event, timestamp, user_id, user_name): ... print(f"Sharing {event} by {user_name}") >>> client.onSharingEvent(on_sharing) def on_webhook_event(self, callback=None, port=None, path=None)-
Expand source code
def on_webhook_event(self, callback=None, port=None, path=None): """ Register a webhook event handler. This can be used as a decorator or a direct method call: @client.on_webhook_event(port=8080, path='/webhook') def handle_webhook(payload): print(f"Received webhook: {payload}") Args: callback (callable, optional): Function to call when a webhook is received port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080 path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/' Returns: callable: Decorator function if used as a decorator """ # If used as a decorator without arguments if callback is not None and callable(callback): # Start webhook server with provided callback self._start_webhook_server(callback) return callback # If used as a decorator with arguments or as a method call def decorator(func): self._start_webhook_server(func, port, path) return func return decoratorRegister a webhook event handler.
This can be used as a decorator or a direct method call:
@client.on_webhook_event(port=8080, path='/webhook') def handle_webhook(payload): print(f"Received webhook: {payload}")
Args
callback:callable, optional- Function to call when a webhook is received
port:int, optional- Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
path:str, optional- URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'
Returns
callable- Decorator function if used as a decorator
def setAudioParams(self, params)-
Expand source code
def setAudioParams(self, params): """ Set audio parameters with validation. Args: params (AudioParams): Audio parameters object Returns: bool: True if parameters were set successfully Raises: ValueError: If parameters are invalid """ _validate_audio_params(params) return super().setAudioParams(params)Set audio parameters with validation.
Args
params:AudioParams- Audio parameters object
Returns
bool- True if parameters were set successfully
Raises
ValueError- If parameters are invalid
-
Expand source code
def setDeskshareParams(self, params): """ Set deskshare parameters with validation. Args: params (DeskshareParams): Deskshare parameters object Returns: bool: True if parameters were set successfully Raises: ValueError: If parameters are invalid """ _validate_deskshare_params(params) return super().setDeskshareParams(params)Set deskshare parameters with validation.
Args
params:DeskshareParams- Deskshare parameters object
Returns
bool- True if parameters were set successfully
Raises
ValueError- If parameters are invalid
def setVideoParams(self, params)-
Expand source code
def setVideoParams(self, params): """ Set video parameters with validation. Args: params (VideoParams): Video parameters object Returns: bool: True if parameters were set successfully Raises: ValueError: If parameters are invalid """ _validate_video_params(params) return super().setVideoParams(params)Set video parameters with validation.
Args
params:VideoParams- Video parameters object
Returns
bool- True if parameters were set successfully
Raises
ValueError- If parameters are invalid
def stop(self)-
Expand source code
def stop(self): """ Stop RTMS client and release resources. This is equivalent to calling leave(), but with a more intuitive name. """ return self.leave()Stop RTMS client and release resources.
This is equivalent to calling leave(), but with a more intuitive name.
def subscribeEvent(self, events)-
Expand source code
def subscribeEvent(self, events): """ Subscribe to receive specific event types. Note: Calling onParticipantEvent() automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE events. Args: events: List of event type constants (e.g., [EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE]) Returns: bool: True if subscription was successful """ return super().subscribeEvent(events)Subscribe to receive specific event types.
Note: Calling onParticipantEvent() automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE events.
Args
events- List of event type constants (e.g., [EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE])
Returns
bool- True if subscription was successful
def unsubscribeEvent(self, events)-
Expand source code
def unsubscribeEvent(self, events): """ Unsubscribe from specific event types. Args: events: List of event type constants Returns: bool: True if unsubscription was successful """ return super().unsubscribeEvent(events)Unsubscribe from specific event types.
Args
events- List of event type constants
Returns
bool- True if unsubscription was successful
-
init(args, *kwargs) Overloaded function.
-
init(self: rtms._rtms.DeskshareParams) -> None
-
init(self: rtms._rtms.DeskshareParams, content_type: typing.SupportsInt, codec: typing.SupportsInt, resolution: typing.SupportsInt, fps: typing.SupportsInt) -> None
Ancestors
- pybind11_builtins.pybind11_object
Instance variables
-
class LogFormat (*args, **kwds)-
Expand source code
class LogFormat(str, Enum): """Available log output formats""" PROGRESSIVE = 'progressive' JSON = 'json'Available log output formats
Ancestors
- builtins.str
- enum.Enum
Class variables
var JSON-
The type of the None singleton.
var PROGRESSIVE-
The type of the None singleton.
class LogLevel (*args, **kwds)-
Expand source code
class LogLevel(IntEnum): """Available log levels for RTMS SDK logging""" ERROR = 0 WARN = 1 INFO = 2 DEBUG = 3 TRACE = 4Available log levels for RTMS SDK logging
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var DEBUG-
The type of the None singleton.
var ERROR-
The type of the None singleton.
var INFO-
The type of the None singleton.
var TRACE-
The type of the None singleton.
var WARN-
The type of the None singleton.
class Metadata (*args, **kwargs)-
Ancestors
- pybind11_builtins.pybind11_object
Instance variables
prop userIdprop userName
class Participant (*args, **kwargs)-
Ancestors
- pybind11_builtins.pybind11_object
Instance variables
prop idprop name
class Session (*args, **kwargs)-
Ancestors
- pybind11_builtins.pybind11_object
Instance variables
prop isActiveprop isPausedprop meetingIdprop sessionIdprop statTimeprop statusprop streamId
class VideoParams (...)-
init(args, *kwargs) Overloaded function.
-
init(self: rtms._rtms.VideoParams) -> None
-
init(self: rtms._rtms.VideoParams, content_type: typing.SupportsInt, codec: typing.SupportsInt, resolution: typing.SupportsInt, data_opt: typing.SupportsInt, fps: typing.SupportsInt) -> None
Ancestors
- pybind11_builtins.pybind11_object
Instance variables
prop codecprop contentTypeprop dataOptprop fpsprop resolution
-