Package rtms
Functions
def configure_logger(options: dict)-
Expand source code
def configure_logger(options: dict): """Configure the logger with the specified options""" global _log_level, _log_format, _log_enabled if 'level' in options: _log_level = options['level'] if 'format' in options: _log_format = options['format'] if 'enabled' in options: _log_enabled = options['enabled'] log_info('logger', f"Logger configured: level={_log_level}, format={_log_format}, enabled={_log_enabled}") log_debug("rtms", "Importing _rtms module")Configure the logger with the specified options
def generate_signature(client, secret, uuid, rtms_stream_id)-
Expand source code
def generate_signature(client, secret, uuid, rtms_stream_id): """Generate a signature for RTMS authentication""" client_id = os.getenv("ZM_RTMS_CLIENT", client) client_secret = os.getenv("ZM_RTMS_SECRET", secret) if not client_id: raise EnvironmentError("Client ID cannot be blank") elif not client_secret: raise EnvironmentError("Client Secret cannot be blank") message = f"{client_id},{uuid},{rtms_stream_id}" signature = hmac.new( client_secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256 ).hexdigest() return signatureGenerate a signature for RTMS authentication
def initialize(ca_path=None)-
Expand source code
def initialize(ca_path=None): """ Initialize the RTMS SDK. Note: The SDK is automatically initialized when you create a Client instance. You only need to call this if you want to initialize with a custom CA path. Args: ca_path (str, optional): Path to the CA certificate file. Returns: bool: True if initialization was successful """ ca_path = find_ca_certificate(ca_path) try: _ClientBase.initialize(ca_path) Client._sdk_initialized = True return True except Exception as e: log_error("rtms", f"Error initializing RTMS SDK: {e}") return FalseInitialize the RTMS SDK.
Note: The SDK is automatically initialized when you create a Client instance. You only need to call this if you want to initialize with a custom CA path.
Args
ca_path:str, optional- Path to the CA certificate file.
Returns
bool- True if initialization was successful
def log_debug(component, message, details=None)-
Expand source code
def log_debug(component, message, details=None): """Log a debug message""" _log(LogLevel.DEBUG, component, message, details)Log a debug message
def log_error(component, message, details=None)-
Expand source code
def log_error(component, message, details=None): """Log an error message""" _log(LogLevel.ERROR, component, message, details)Log an error message
def log_info(component, message, details=None)-
Expand source code
def log_info(component, message, details=None): """Log an info message""" _log(LogLevel.INFO, component, message, details)Log an info message
def log_warn(component, message, details=None)-
Expand source code
def log_warn(component, message, details=None): """Log a warning message""" _log(LogLevel.WARN, component, message, details)Log a warning message
def onWebhookEvent(callback=None, port=None, path=None)-
Expand source code
def onWebhookEvent(callback=None, port=None, path=None): """ Start a webhook server to receive events from Zoom. This function creates an HTTP server that listens for webhook events from Zoom. When a webhook event is received, it parses the JSON payload and passes it to the provided callback function. Can be used as a decorator or a direct function call: @rtms.onWebhookEvent(port=8080, path='/webhook') def handle_webhook(payload): if payload.get('event') == 'meeting.rtms.started': # Create a client and join client = rtms.Client() client.join(...) Args: callback (callable, optional): Function to call when a webhook is received port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080 path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/' Returns: callable: Decorator function if used as a decorator """ global _webhook_server # Determine port and path webhook_port = port or int(os.getenv('ZM_RTMS_PORT', '8080')) webhook_path = path or os.getenv('ZM_RTMS_PATH', '/') # If used as a decorator without arguments if callback is not None and callable(callback): if _webhook_server is None: _webhook_server = WebhookServer(webhook_port, webhook_path) _webhook_server.start(callback) return callback # If used as a decorator with arguments or as a method call def decorator(func): global _webhook_server if _webhook_server is None: _webhook_server = WebhookServer(webhook_port, webhook_path) _webhook_server.start(func) return func return decoratorStart a webhook server to receive events from Zoom.
This function creates an HTTP server that listens for webhook events from Zoom. When a webhook event is received, it parses the JSON payload and passes it to the provided callback function.
Can be used as a decorator or a direct function call:
@rtms.onWebhookEvent(port=8080, path='/webhook') def handle_webhook(payload): if payload.get('event') == 'meeting.rtms.started': # Create a client and join client = rtms.Client() client.join(…)
Args
callback:callable, optional- Function to call when a webhook is received
port:int, optional- Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
path:str, optional- URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'
Returns
callable- Decorator function if used as a decorator
def on_webhook_event(callback=None, port=None, path=None)-
Expand source code
def onWebhookEvent(callback=None, port=None, path=None): """ Start a webhook server to receive events from Zoom. This function creates an HTTP server that listens for webhook events from Zoom. When a webhook event is received, it parses the JSON payload and passes it to the provided callback function. Can be used as a decorator or a direct function call: @rtms.onWebhookEvent(port=8080, path='/webhook') def handle_webhook(payload): if payload.get('event') == 'meeting.rtms.started': # Create a client and join client = rtms.Client() client.join(...) Args: callback (callable, optional): Function to call when a webhook is received port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080 path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/' Returns: callable: Decorator function if used as a decorator """ global _webhook_server # Determine port and path webhook_port = port or int(os.getenv('ZM_RTMS_PORT', '8080')) webhook_path = path or os.getenv('ZM_RTMS_PATH', '/') # If used as a decorator without arguments if callback is not None and callable(callback): if _webhook_server is None: _webhook_server = WebhookServer(webhook_port, webhook_path) _webhook_server.start(callback) return callback # If used as a decorator with arguments or as a method call def decorator(func): global _webhook_server if _webhook_server is None: _webhook_server = WebhookServer(webhook_port, webhook_path) _webhook_server.start(func) return func return decoratorStart a webhook server to receive events from Zoom.
This function creates an HTTP server that listens for webhook events from Zoom. When a webhook event is received, it parses the JSON payload and passes it to the provided callback function.
Can be used as a decorator or a direct function call:
@rtms.onWebhookEvent(port=8080, path='/webhook') def handle_webhook(payload): if payload.get('event') == 'meeting.rtms.started': # Create a client and join client = rtms.Client() client.join(…)
Args
callback:callable, optional- Function to call when a webhook is received
port:int, optional- Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
path:str, optional- URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'
Returns
callable- Decorator function if used as a decorator
def run(poll_interval: float = 0.01, stop_on_empty: bool = False, executor=None)-
Expand source code
def run(poll_interval: float = 0.01, stop_on_empty: bool = False, executor=None): """ Start the default RTMS event loop (blocking). Clients that call join() without an explicit EventLoop are automatically routed to this default loop. Blocks until interrupted or stop() is called. Args: poll_interval: Seconds between poll cycles (default: 0.01 = 10ms) stop_on_empty: Stop automatically when all clients have left executor: Optional concurrent.futures.Executor for dispatching data callbacks on all clients that don't have their own executor set. Example:: @rtms.on_webhook_event def handle(payload): client = rtms.Client() client.on_transcript_data(lambda d,s,t,m: print(m.userName, d)) client.join(payload['payload']) rtms.run() # blocks until Ctrl-C """ global _default_loop, _running, _run_executor _run_executor = executor _default_loop = EventLoop(poll_interval=poll_interval, name='rtms-default') _running = True _stop_event.clear() try: _default_loop.run(stop_on_empty=stop_on_empty) finally: _running = False _default_loop = None _run_executor = NoneStart the default RTMS event loop (blocking).
Clients that call join() without an explicit EventLoop are automatically routed to this default loop. Blocks until interrupted or stop() is called.
Args
poll_interval- Seconds between poll cycles (default: 0.01 = 10ms)
stop_on_empty- Stop automatically when all clients have left
executor- Optional concurrent.futures.Executor for dispatching data callbacks on all clients that don't have their own executor set.
Example::
@rtms.on_webhook_event def handle(payload): client = rtms.Client() client.on_transcript_data(lambda d,s,t,m: print(m.userName, d)) client.join(payload['payload']) rtms.run() # blocks until Ctrl-C async def run_async(poll_interval: float = 0.01, stop_on_empty: bool = False, executor=None)-
Expand source code
async def run_async(poll_interval: float = 0.01, stop_on_empty: bool = False, executor=None): """ Start the default RTMS event loop as an asyncio coroutine. Drop-in async replacement for rtms.run(). Yields to the asyncio event loop between poll cycles so other coroutines (aiohttp, FastAPI, asyncpg) run freely. Args: poll_interval: Seconds between poll cycles (default: 0.01 = 10ms) stop_on_empty: Stop automatically when all clients have left executor: Optional concurrent.futures.Executor for dispatching data callbacks on all clients that don't have their own executor set. Example:: async def main(): await asyncio.gather(rtms.run_async(), aiohttp_app.start()) asyncio.run(main()) """ global _default_loop, _running, _run_executor _run_executor = executor _default_loop = EventLoop(poll_interval=poll_interval, name='rtms-default') _running = True _stop_event.clear() try: await _default_loop.run_async(stop_on_empty=stop_on_empty) finally: _running = False _default_loop = None _run_executor = NoneStart the default RTMS event loop as an asyncio coroutine.
Drop-in async replacement for rtms.run(). Yields to the asyncio event loop between poll cycles so other coroutines (aiohttp, FastAPI, asyncpg) run freely.
Args
poll_interval- Seconds between poll cycles (default: 0.01 = 10ms)
stop_on_empty- Stop automatically when all clients have left
executor- Optional concurrent.futures.Executor for dispatching data callbacks on all clients that don't have their own executor set.
Example::
async def main(): await asyncio.gather(rtms.run_async(), aiohttp_app.start()) asyncio.run(main()) def stop()-
Expand source code
def stop(): """ Stop the default event loop (rtms.run() or rtms.run_async()). For EventLoop or EventLoopPool instances, call their .stop() method directly. """ global _running _running = False _stop_event.set() if _default_loop: _default_loop.stop() log_info('rtms', 'Stop signal received')Stop the default event loop (rtms.run() or rtms.run_async()).
For EventLoop or EventLoopPool instances, call their .stop() method directly.
def uninitialize()-
Expand source code
def uninitialize(): """ Uninitialize the RTMS SDK. Call this when you're done using the SDK to release resources. """ try: _ClientBase.uninitialize() Client._sdk_initialized = False return True except Exception as e: log_error("rtms", f"Error uninitializing RTMS SDK: {e}") traceback.print_exc() return FalseUninitialize the RTMS SDK.
Call this when you're done using the SDK to release resources.
Classes
class AiInterpreter (*args, **kwargs)-
Ancestors
- pybind11_builtins.pybind11_object
Instance variables
prop channelNumprop lidprop sampleRateprop targetsprop timestamp
class AiTargetLanguage (*args, **kwargs)-
Ancestors
- pybind11_builtins.pybind11_object
Instance variables
prop engineprop lidprop toneIdprop voiceId
class AudioChannel (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var MONO-
The type of the None singleton.
var STEREO-
The type of the None singleton.
class AudioCodec (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var G711-
The type of the None singleton.
var G722-
The type of the None singleton.
var L16-
The type of the None singleton.
var OPUS-
The type of the None singleton.
var UNDEFINED-
The type of the None singleton.
class AudioContentType (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var FILE_STREAM-
The type of the None singleton.
var RAW_AUDIO-
The type of the None singleton.
var RTP-
The type of the None singleton.
var TEXT-
The type of the None singleton.
var UNDEFINED-
The type of the None singleton.
class AudioParams (...)-
init(args, *kwargs) Overloaded function.
-
init(self: rtms._rtms.AudioParams) -> None
-
init(self: rtms._rtms.AudioParams, content_type: typing.SupportsInt | typing.SupportsIndex, codec: typing.SupportsInt | typing.SupportsIndex, sample_rate: typing.SupportsInt | typing.SupportsIndex, channel: typing.SupportsInt | typing.SupportsIndex, data_opt: typing.SupportsInt | typing.SupportsIndex, duration: typing.SupportsInt | typing.SupportsIndex, frame_size: typing.SupportsInt | typing.SupportsIndex) -> None
Ancestors
- pybind11_builtins.pybind11_object
Instance variables
prop channelprop codecprop contentTypeprop content_typeprop dataOptprop data_optprop durationprop frameSizeprop frame_sizeprop sampleRateprop sample_rate
-
class AudioSampleRate (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var SR_16K-
The type of the None singleton.
var SR_32K-
The type of the None singleton.
var SR_48K-
The type of the None singleton.
var SR_8K-
The type of the None singleton.
class Client (executor=None)-
Expand source code
class Client(_ClientBase): """ RTMS Client - provides real-time media streaming capabilities This client allows you to join Zoom meetings and process audio, video, and transcript data in real-time. """ _sdk_initialized = False def __init__(self, executor=None): """Initialize a new RTMS client. Args: executor: Optional concurrent.futures.Executor for dispatching data callbacks (audio, video, transcript, deskshare) to a thread pool. When set, callbacks are submitted via executor.submit() instead of running inline. Pass concurrent.futures.ThreadPoolExecutor(n) for CPU-bound or I/O-heavy callbacks. """ # super().__init__() is PyClient() — intentionally a no-op at construction # time. The C SDK handle is allocated lazily in _do_alloc_and_join(), which # runs on the owning EventLoop's thread. This satisfies the C SDK's thread # affinity requirement: alloc/join/poll/release must share one OS thread. super().__init__() self._polling_interval = 10 # milliseconds self._running = False self._webhook_server = None self._executor = executor # concurrent.futures.Executor or None self._loop = None # asyncio event loop captured at callback-registration time # EventLoop that owns this client's lifecycle (set by loop.add() or auto-created) self._assigned_loop: Optional['EventLoop'] = None # Pending join params — stored until the loop's thread calls _do_alloc_and_join() self._pending_join_params: Optional[dict] = None # Individual video subscription callbacks self._participant_video_callback = None self._video_subscribed_callback = None # Shared event dispatcher state (matches Node.js setupEventHandler pattern) self._event_handler_registered = False self._participant_event_callback = None self._active_speaker_callback = None self._sharing_callback = None self._media_interrupted_callback = None self._raw_event_callback = None # Register with global client registry with _clients_lock: _clients[id(self)] = self def join(self, meeting_uuid: str = None, webinar_uuid: str = None, session_id: str = None, engagement_id: str = None, rtms_stream_id: str = None, server_urls: str = None, signature: str = None, timeout: int = -1, ca: str = None, client: str = None, secret: str = None, poll_interval: int = 10, **kwargs): """ Join a RTMS session. Can be called with positional arguments or with a dictionary of parameters. Args: meeting_uuid (str): Meeting UUID (for Meeting SDK events) webinar_uuid (str): Webinar UUID (for Webinar events) session_id (str): Session ID (for Video SDK events) - used when meeting_uuid is not provided engagement_id (str): Engagement ID (for ZCC events) - used when meeting_uuid is not provided rtms_stream_id (str): RTMS stream ID server_urls (str): Server URLs (comma-separated) signature (str, optional): Authentication signature. If not provided, will be generated timeout (int, optional): Timeout in milliseconds. Defaults to -1 (no timeout). ca (str, optional): CA certificate path. Defaults to system CA files. client (str, optional): Client ID. If empty, uses ZM_RTMS_CLIENT env var. secret (str, optional): Client secret. If empty, uses ZM_RTMS_SECRET env var. poll_interval (int, optional): Polling interval in milliseconds. Defaults to 10. **kwargs: Additional arguments passed to join Returns: bool: True if joined successfully, False otherwise """ # Normalise all call forms into a single params dict params = { 'meeting_uuid': meeting_uuid, 'webinar_uuid': webinar_uuid, 'session_id': session_id, 'engagement_id': engagement_id, 'rtms_stream_id': rtms_stream_id, 'server_urls': server_urls, 'signature': signature, 'timeout': timeout, 'ca': ca, 'client': client, 'secret': secret, 'poll_interval': poll_interval, } if kwargs: params.update(kwargs) if isinstance(meeting_uuid, dict): params = dict(meeting_uuid) # Store params for the loop thread to consume self._pending_join_params = params # If the client has been assigned to an explicit EventLoop, it will be # picked up by that loop's _drain_pending() call — nothing else to do. if self._assigned_loop is not None: log_debug("client", "join() deferred to assigned EventLoop thread") return True # If rtms.run() / rtms.run_async() is active, route to the default loop. if _default_loop is not None: log_debug("client", "join() routed to default EventLoop") _default_loop.add(self) return True # Zero-config (Tier 0): no loop assigned and no run() active — # create an implicit single-client EventLoop as a background daemon thread. log_debug("client", "No EventLoop assigned — creating implicit single-client loop") implicit_loop = EventLoop( poll_interval=params.get('poll_interval', 10) / 1000.0, name='rtms-implicit', ) implicit_loop.add(self) implicit_loop.start() return True def _do_alloc_and_join(self) -> None: """ Called by EventLoop._drain_pending() on the loop's own thread. Performs the two operations that must share an OS thread: 1. alloc() — creates the C SDK handle (rtms_alloc) 2. join() — registers callbacks and connects (rtms_set_callbacks + rtms_join) """ params = self._pending_join_params if params is None: raise RuntimeError("_do_alloc_and_join called with no pending join params") try: meeting_uuid = params.get('meeting_uuid') webinar_uuid = params.get('webinar_uuid') session_id = params.get('session_id') engagement_id = params.get('engagement_id') rtms_stream_id = params.get('rtms_stream_id') server_urls = params.get('server_urls') signature = params.get('signature') timeout = params.get('timeout', -1) client_id = params.get('client', os.getenv('ZM_RTMS_CLIENT')) secret = params.get('secret', os.getenv('ZM_RTMS_SECRET')) poll_interval = params.get('poll_interval', 10) instance_id = meeting_uuid or webinar_uuid or session_id or engagement_id if not instance_id: raise ValueError("meeting_uuid, webinar_uuid, session_id, or engagement_id is required") if not rtms_stream_id: raise ValueError("rtms_stream_id is required") if not server_urls: raise ValueError("server_urls is required") if not signature: signature = generate_signature(client_id, secret, instance_id, rtms_stream_id) self._polling_interval = poll_interval # Phase 1: ensure SDK is initialized on THIS thread (same thread as alloc/join). # The lock ensures init() runs exactly once even if multiple EventLoops start # simultaneously, but the actual C call only happens on the first client's thread. with _sdk_init_lock: if not Client._sdk_initialized: ca_path = find_ca_certificate() log_debug("client", f"Initializing SDK with CA: {ca_path}") try: _ClientBase.initialize(ca_path, 1, "python-rtms") except Exception: log_debug("client", "Trying SDK initialization with empty CA path") _ClientBase.initialize("", 1, "python-rtms") Client._sdk_initialized = True log_debug("client", "SDK initialized successfully") # Phase 2: allocate C SDK handle on this thread super().alloc() session_type = 'meeting' if meeting_uuid else 'webinar' if webinar_uuid else 'engagement' if engagement_id else 'session' log_info("client", f"Joining {session_type}: {instance_id}") # join() on the same thread as alloc() — C SDK constraint satisfied super().join(instance_id, rtms_stream_id, signature, server_urls, timeout) self._running = True log_info("client", "Successfully joined") except Exception as e: log_error("client", f"Error in _do_alloc_and_join: {e}") traceback.print_exc() raise def _initialize_rtms(self, ca_path=None): """Initialize the RTMS SDK with the best available CA certificate""" try: # Find the best CA certificate ca_path = find_ca_certificate(ca_path) # Initialize the SDK log_debug("client", f"Initializing RTMS with CA: {ca_path}") _ClientBase.initialize(ca_path) return True except Exception as e: log_error("client", f"Error initializing RTMS: {e}") traceback.print_exc() # Try with an empty path as a last resort try: log_debug("client", "Trying initialization with empty CA path") _ClientBase.initialize("") return True except Exception as e2: log_error("client", f"Failed to initialize with empty CA path: {e2}") raise e # Raise the original error def poll(self): """Poll the C SDK for pending events. Called by the owning EventLoop's thread.""" if self._running: try: super().poll() except Exception as e: log_error("client", f"Error during polling: {e}") # ======================================================================== # Callback Dispatch # ======================================================================== def _wrap_callback(self, callback): """Wrap a callback for executor or asyncio dispatch. - sync + no executor → returned unchanged (v1.0 inline behavior) - sync + executor → submitted to executor.submit() on each call - async coroutine → scheduled on the captured asyncio event loop """ if callback is None: return None if inspect.iscoroutinefunction(callback): # Capture the running loop now (at registration time) if one exists. try: loop = asyncio.get_running_loop() except RuntimeError: loop = None self._loop = loop def async_wrapper(*args): _loop = self._loop if _loop and _loop.is_running(): asyncio.run_coroutine_threadsafe(callback(*args), _loop) else: try: asyncio.run(callback(*args)) except RuntimeError: pass return async_wrapper executor = self._executor or _run_executor if executor is not None: def executor_wrapper(*args): executor.submit(callback, *args) return executor_wrapper return callback # ======================================================================== # Data Callbacks (Python-level so _wrap_callback applies and aliases work) # ======================================================================== def on_audio_data(self, callback) -> None: """Register audio data callback. Supports executor and async coroutines.""" super().on_audio_data(self._wrap_callback(callback)) onAudioData = on_audio_data def on_video_data(self, callback) -> None: """Register video data callback. Supports executor and async coroutines.""" super().on_video_data(self._wrap_callback(callback)) onVideoData = on_video_data def on_deskshare_data(self, callback) -> None: """Register deskshare data callback. Supports executor and async coroutines.""" super().on_deskshare_data(self._wrap_callback(callback)) onDeskshareData = on_deskshare_data def on_transcript_data(self, callback) -> None: """Register transcript data callback. Supports executor and async coroutines.""" super().on_transcript_data(self._wrap_callback(callback)) onTranscriptData = on_transcript_data # ======================================================================== # Context Manager # ======================================================================== def __enter__(self): """Support `with rtms.Client() as client:` usage.""" return self def __exit__(self, *_): """Call leave() on context exit. Exceptions are not suppressed.""" self.leave() return False def stop(self): """ Stop RTMS client and release resources. This is equivalent to calling leave(), but with a more intuitive name. """ return self.leave() def set_audio_params(self, params): """ Set audio parameters with validation. Args: params (AudioParams): Audio parameters object Returns: bool: True if parameters were set successfully Raises: ValueError: If parameters are invalid """ _validate_audio_params(params) return super().set_audio_params(params) # camelCase legacy alias setAudioParams = set_audio_params def set_video_params(self, params): """ Set video parameters with validation. Args: params (VideoParams): Video parameters object Returns: bool: True if parameters were set successfully Raises: ValueError: If parameters are invalid """ _validate_video_params(params) return super().set_video_params(params) # camelCase legacy alias setVideoParams = set_video_params def set_deskshare_params(self, params): """ Set deskshare parameters with validation. Args: params (DeskshareParams): Deskshare parameters object Returns: bool: True if parameters were set successfully Raises: ValueError: If parameters are invalid """ _validate_deskshare_params(params) return super().set_deskshare_params(params) # camelCase legacy alias setDeskshareParams = set_deskshare_params def set_transcript_params(self, params): """ Set transcript parameters. Args: params (TranscriptParams): Transcript parameters object Returns: bool: True if parameters were set successfully """ return super().set_transcript_params(params) # camelCase legacy alias setTranscriptParams = set_transcript_params def set_proxy(self, proxy_type: str, proxy_url: str) -> None: """Configure a proxy for SDK connections. Args: proxy_type (str): Proxy protocol type (e.g. 'http', 'https'). proxy_url (str): Full proxy URL including host and port. """ return super().set_proxy(proxy_type, proxy_url) # camelCase legacy alias setProxy = set_proxy def subscribe_video(self, user_id: int, subscribe: bool) -> None: """Subscribe or unsubscribe from an individual participant's video stream. Args: user_id (int): The participant's user ID. subscribe (bool): True to subscribe, False to unsubscribe. """ return super().subscribe_video(user_id, subscribe) subscribeVideo = subscribe_video def on_participant_video(self, callback) -> None: """Register a callback for participant video state changes. The callback receives (users: list[int], is_on: bool). """ self._participant_video_callback = callback super().on_participant_video(callback) onParticipantVideo = on_participant_video def on_video_subscribed(self, callback) -> None: """Register a callback for video subscription responses. The callback receives (user_id: int, status: int, error: str). """ self._video_subscribed_callback = callback super().on_video_subscribed(callback) onVideoSubscribed = on_video_subscribed def subscribe_event(self, events): """ Subscribe to receive specific event types. Note: Calling on_participant_event() automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE events. Args: events: List of event type constants (e.g., [EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE]) Returns: bool: True if subscription was successful """ return super().subscribe_event(events) # camelCase legacy alias subscribeEvent = subscribe_event def unsubscribe_event(self, events): """ Unsubscribe from specific event types. Args: events: List of event type constants Returns: bool: True if unsubscription was successful """ return super().unsubscribe_event(events) # camelCase legacy alias unsubscribeEvent = unsubscribe_event def _setup_event_handler(self): """ Internal shared event dispatcher that routes events to typed callbacks. Matches the Node.js setupEventHandler() pattern. Only registers once. """ if self._event_handler_registered: return self._event_handler_registered = True def event_dispatcher(event_data: str): if self._raw_event_callback: self._raw_event_callback(event_data) try: data = json.loads(event_data) event_type = data.get('event_type') if event_type == EVENT_PARTICIPANT_JOIN: if self._participant_event_callback: participants = [ {'user_id': p.get('user_id'), 'user_name': p.get('user_name')} for p in data.get('participants', []) ] self._participant_event_callback('join', data.get('timestamp', 0), participants) elif event_type == EVENT_PARTICIPANT_LEAVE: if self._participant_event_callback: participants = [ {'user_id': p.get('user_id'), 'user_name': p.get('user_name')} for p in data.get('participants', []) ] self._participant_event_callback('leave', data.get('timestamp', 0), participants) elif event_type == EVENT_ACTIVE_SPEAKER_CHANGE: if self._active_speaker_callback: self._active_speaker_callback( data.get('timestamp', 0), data.get('user_id', 0), data.get('user_name', '') ) elif event_type == EVENT_SHARING_START: if self._sharing_callback: self._sharing_callback( 'start', data.get('timestamp', 0), data.get('user_id'), data.get('user_name') ) elif event_type == EVENT_SHARING_STOP: if self._sharing_callback: self._sharing_callback( 'stop', data.get('timestamp', 0), None, None ) elif event_type == EVENT_MEDIA_CONNECTION_INTERRUPTED: if self._media_interrupted_callback: self._media_interrupted_callback(data.get('timestamp', 0)) except Exception as e: log_error('client', f'Failed to parse event: {e}') super().on_event_ex(event_dispatcher) def on_participant_event(self, callback: Callable[[str, int, list], None]) -> bool: """ Register a callback for participant join/leave events. This automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE. Events are delivered as parsed objects, not raw JSON. Args: callback: Function called with (event, timestamp, participants) where: - event: 'join' or 'leave' - timestamp: Unix timestamp in milliseconds - participants: List of dicts with 'user_id' and optional 'user_name' Returns: bool: True if registration succeeds Example: >>> def on_participant(event, timestamp, participants): ... print(f"Participant {event}: {participants}") >>> client.on_participant_event(on_participant) """ self._participant_event_callback = callback self._setup_event_handler() try: self.subscribe_event([EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to participant events: {e}') return True # camelCase legacy alias onParticipantEvent = on_participant_event def on_active_speaker_event(self, callback: Callable[[int, int, str], None]) -> bool: """ Register a callback for active speaker change events. This automatically subscribes to EVENT_ACTIVE_SPEAKER_CHANGE. Args: callback: Function called with (timestamp, user_id, user_name) Returns: bool: True if registration succeeds Example: >>> def on_speaker(timestamp, user_id, user_name): ... print(f"Active speaker: {user_name} ({user_id})") >>> client.on_active_speaker_event(on_speaker) """ self._active_speaker_callback = callback self._setup_event_handler() try: self.subscribe_event([EVENT_ACTIVE_SPEAKER_CHANGE]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to active speaker events: {e}') return True # camelCase legacy alias onActiveSpeakerEvent = on_active_speaker_event def on_sharing_event(self, callback: Callable[[str, int, Optional[int], Optional[str]], None]) -> bool: """ Register a callback for sharing start/stop events. This automatically subscribes to EVENT_SHARING_START and EVENT_SHARING_STOP. Note: These events only work when the RTMS app has DESKSHARE scope permission. Args: callback: Function called with (event, timestamp, user_id, user_name) where: - event: 'start' or 'stop' - timestamp: Unix timestamp in milliseconds - user_id: Optional user ID (only for 'start') - user_name: Optional user name (only for 'start') Returns: bool: True if registration succeeds Example: >>> def on_sharing(event, timestamp, user_id, user_name): ... print(f"Sharing {event} by {user_name}") >>> client.on_sharing_event(on_sharing) """ self._sharing_callback = callback self._setup_event_handler() try: self.subscribe_event([EVENT_SHARING_START, EVENT_SHARING_STOP]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to sharing events: {e}') return True # camelCase legacy alias onSharingEvent = on_sharing_event def on_media_connection_interrupted(self, callback: Callable[[int], None]) -> bool: """ Register a callback for media connection interrupted events. This automatically subscribes to EVENT_MEDIA_CONNECTION_INTERRUPTED. Args: callback: Function called with (timestamp,) when the media connection is interrupted Returns: bool: True if registration succeeds Example: >>> def on_interrupted(timestamp): ... print(f"Media connection interrupted at {timestamp}") >>> client.on_media_connection_interrupted(on_interrupted) """ self._media_interrupted_callback = callback self._setup_event_handler() try: self.subscribe_event([EVENT_MEDIA_CONNECTION_INTERRUPTED]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to media connection interrupted events: {e}') return True # camelCase legacy alias onMediaConnectionInterrupted = on_media_connection_interrupted def on_event_ex(self, callback: Callable[[str], None]) -> bool: """ Register a callback for raw event data. This provides access to the raw JSON event data from the SDK. Use this when you need custom event handling or access to all event types. This callback is called IN ADDITION to typed callbacks, not instead of. Args: callback: Function called with raw JSON event data string Returns: bool: True if registration succeeds """ self._raw_event_callback = callback self._setup_event_handler() return True # camelCase legacy alias onEventEx = on_event_ex def leave(self): """ Leave the RTMS session. Signals the owning EventLoop to stop polling this client and releases the C SDK handle. Safe to call from any thread. Returns: bool: True if left successfully """ log_info("client", "Leaving RTMS session") # Signal the EventLoop to stop polling this client self._running = False # Unregister from global client registry with _clients_lock: _clients.pop(id(self), None) # Stop webhook server if we have one if self._webhook_server: self._webhook_server.stop() self._webhook_server = None try: # Release RTMS resources super().release() return True except Exception as e: log_error("client", f"Error releasing RTMS resources: {e}") traceback.print_exc() return False def on_webhook_event(self, callback=None, port=None, path=None): """ Register a webhook event handler. This can be used as a decorator or a direct method call: @client.on_webhook_event(port=8080, path='/webhook') def handle_webhook(payload): print(f"Received webhook: {payload}") Args: callback (callable, optional): Function to call when a webhook is received port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080 path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/' Returns: callable: Decorator function if used as a decorator """ # If used as a decorator without arguments if callback is not None and callable(callback): # Start webhook server with provided callback self._start_webhook_server(callback) return callback # If used as a decorator with arguments or as a method call def decorator(func): self._start_webhook_server(func, port, path) return func return decorator def _start_webhook_server(self, callback, port=None, path=None): """ Start the webhook server Args: callback (callable): Function to call when a webhook is received port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080 path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/' """ if not self._webhook_server: port = port or int(os.getenv('ZM_RTMS_PORT', '8080')) path = path or os.getenv('ZM_RTMS_PATH', '/') self._webhook_server = WebhookServer(port, path) self._webhook_server.start(callback)RTMS Client - provides real-time media streaming capabilities
This client allows you to join Zoom meetings and process audio, video, and transcript data in real-time.
Initialize a new RTMS client.
Args
executor- Optional concurrent.futures.Executor for dispatching data callbacks (audio, video, transcript, deskshare) to a thread pool. When set, callbacks are submitted via executor.submit() instead of running inline. Pass concurrent.futures.ThreadPoolExecutor(n) for CPU-bound or I/O-heavy callbacks.
Ancestors
- rtms._rtms.Client
- pybind11_builtins.pybind11_object
Methods
def join(self,
meeting_uuid: str = None,
webinar_uuid: str = None,
session_id: str = None,
engagement_id: str = None,
rtms_stream_id: str = None,
server_urls: str = None,
signature: str = None,
timeout: int = -1,
ca: str = None,
client: str = None,
secret: str = None,
poll_interval: int = 10,
**kwargs)-
Expand source code
def join(self, meeting_uuid: str = None, webinar_uuid: str = None, session_id: str = None, engagement_id: str = None, rtms_stream_id: str = None, server_urls: str = None, signature: str = None, timeout: int = -1, ca: str = None, client: str = None, secret: str = None, poll_interval: int = 10, **kwargs): """ Join a RTMS session. Can be called with positional arguments or with a dictionary of parameters. Args: meeting_uuid (str): Meeting UUID (for Meeting SDK events) webinar_uuid (str): Webinar UUID (for Webinar events) session_id (str): Session ID (for Video SDK events) - used when meeting_uuid is not provided engagement_id (str): Engagement ID (for ZCC events) - used when meeting_uuid is not provided rtms_stream_id (str): RTMS stream ID server_urls (str): Server URLs (comma-separated) signature (str, optional): Authentication signature. If not provided, will be generated timeout (int, optional): Timeout in milliseconds. Defaults to -1 (no timeout). ca (str, optional): CA certificate path. Defaults to system CA files. client (str, optional): Client ID. If empty, uses ZM_RTMS_CLIENT env var. secret (str, optional): Client secret. If empty, uses ZM_RTMS_SECRET env var. poll_interval (int, optional): Polling interval in milliseconds. Defaults to 10. **kwargs: Additional arguments passed to join Returns: bool: True if joined successfully, False otherwise """ # Normalise all call forms into a single params dict params = { 'meeting_uuid': meeting_uuid, 'webinar_uuid': webinar_uuid, 'session_id': session_id, 'engagement_id': engagement_id, 'rtms_stream_id': rtms_stream_id, 'server_urls': server_urls, 'signature': signature, 'timeout': timeout, 'ca': ca, 'client': client, 'secret': secret, 'poll_interval': poll_interval, } if kwargs: params.update(kwargs) if isinstance(meeting_uuid, dict): params = dict(meeting_uuid) # Store params for the loop thread to consume self._pending_join_params = params # If the client has been assigned to an explicit EventLoop, it will be # picked up by that loop's _drain_pending() call — nothing else to do. if self._assigned_loop is not None: log_debug("client", "join() deferred to assigned EventLoop thread") return True # If rtms.run() / rtms.run_async() is active, route to the default loop. if _default_loop is not None: log_debug("client", "join() routed to default EventLoop") _default_loop.add(self) return True # Zero-config (Tier 0): no loop assigned and no run() active — # create an implicit single-client EventLoop as a background daemon thread. log_debug("client", "No EventLoop assigned — creating implicit single-client loop") implicit_loop = EventLoop( poll_interval=params.get('poll_interval', 10) / 1000.0, name='rtms-implicit', ) implicit_loop.add(self) implicit_loop.start() return TrueJoin a RTMS session.
Can be called with positional arguments or with a dictionary of parameters.
Args
meeting_uuid:str- Meeting UUID (for Meeting SDK events)
webinar_uuid:str- Webinar UUID (for Webinar events)
session_id:str- Session ID (for Video SDK events) - used when meeting_uuid is not provided
engagement_id:str- Engagement ID (for ZCC events) - used when meeting_uuid is not provided
rtms_stream_id:str- RTMS stream ID
server_urls:str- Server URLs (comma-separated)
signature:str, optional- Authentication signature. If not provided, will be generated
timeout:int, optional- Timeout in milliseconds. Defaults to -1 (no timeout).
ca:str, optional- CA certificate path. Defaults to system CA files.
client:str, optional- Client ID. If empty, uses ZM_RTMS_CLIENT env var.
secret:str, optional- Client secret. If empty, uses ZM_RTMS_SECRET env var.
poll_interval:int, optional- Polling interval in milliseconds. Defaults to 10.
**kwargs- Additional arguments passed to join
Returns
bool- True if joined successfully, False otherwise
def leave(self)-
Expand source code
def leave(self): """ Leave the RTMS session. Signals the owning EventLoop to stop polling this client and releases the C SDK handle. Safe to call from any thread. Returns: bool: True if left successfully """ log_info("client", "Leaving RTMS session") # Signal the EventLoop to stop polling this client self._running = False # Unregister from global client registry with _clients_lock: _clients.pop(id(self), None) # Stop webhook server if we have one if self._webhook_server: self._webhook_server.stop() self._webhook_server = None try: # Release RTMS resources super().release() return True except Exception as e: log_error("client", f"Error releasing RTMS resources: {e}") traceback.print_exc() return FalseLeave the RTMS session.
Signals the owning EventLoop to stop polling this client and releases the C SDK handle. Safe to call from any thread.
Returns
bool- True if left successfully
def onActiveSpeakerEvent(self, callback: Callable[[int, int, str], None]) ‑> bool-
Expand source code
def on_active_speaker_event(self, callback: Callable[[int, int, str], None]) -> bool: """ Register a callback for active speaker change events. This automatically subscribes to EVENT_ACTIVE_SPEAKER_CHANGE. Args: callback: Function called with (timestamp, user_id, user_name) Returns: bool: True if registration succeeds Example: >>> def on_speaker(timestamp, user_id, user_name): ... print(f"Active speaker: {user_name} ({user_id})") >>> client.on_active_speaker_event(on_speaker) """ self._active_speaker_callback = callback self._setup_event_handler() try: self.subscribe_event([EVENT_ACTIVE_SPEAKER_CHANGE]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to active speaker events: {e}') return TrueRegister a callback for active speaker change events.
This automatically subscribes to EVENT_ACTIVE_SPEAKER_CHANGE.
Args
callback- Function called with (timestamp, user_id, user_name)
Returns
bool- True if registration succeeds
Example
>>> def on_speaker(timestamp, user_id, user_name): ... print(f"Active speaker: {user_name} ({user_id})") >>> client.on_active_speaker_event(on_speaker) def onAudioData(self, callback) ‑> None-
Expand source code
def on_audio_data(self, callback) -> None: """Register audio data callback. Supports executor and async coroutines.""" super().on_audio_data(self._wrap_callback(callback))Register audio data callback. Supports executor and async coroutines.
-
Expand source code
def on_deskshare_data(self, callback) -> None: """Register deskshare data callback. Supports executor and async coroutines.""" super().on_deskshare_data(self._wrap_callback(callback))Register deskshare data callback. Supports executor and async coroutines.
def onEventEx(self, callback: Callable[[str], None]) ‑> bool-
Expand source code
def on_event_ex(self, callback: Callable[[str], None]) -> bool: """ Register a callback for raw event data. This provides access to the raw JSON event data from the SDK. Use this when you need custom event handling or access to all event types. This callback is called IN ADDITION to typed callbacks, not instead of. Args: callback: Function called with raw JSON event data string Returns: bool: True if registration succeeds """ self._raw_event_callback = callback self._setup_event_handler() return TrueRegister a callback for raw event data.
This provides access to the raw JSON event data from the SDK. Use this when you need custom event handling or access to all event types. This callback is called IN ADDITION to typed callbacks, not instead of.
Args
callback- Function called with raw JSON event data string
Returns
bool- True if registration succeeds
def onMediaConnectionInterrupted(self, callback: Callable[[int], None]) ‑> bool-
Expand source code
def on_media_connection_interrupted(self, callback: Callable[[int], None]) -> bool: """ Register a callback for media connection interrupted events. This automatically subscribes to EVENT_MEDIA_CONNECTION_INTERRUPTED. Args: callback: Function called with (timestamp,) when the media connection is interrupted Returns: bool: True if registration succeeds Example: >>> def on_interrupted(timestamp): ... print(f"Media connection interrupted at {timestamp}") >>> client.on_media_connection_interrupted(on_interrupted) """ self._media_interrupted_callback = callback self._setup_event_handler() try: self.subscribe_event([EVENT_MEDIA_CONNECTION_INTERRUPTED]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to media connection interrupted events: {e}') return TrueRegister a callback for media connection interrupted events.
This automatically subscribes to EVENT_MEDIA_CONNECTION_INTERRUPTED.
Args
callback- Function called with (timestamp,) when the media connection is interrupted
Returns
bool- True if registration succeeds
Example
>>> def on_interrupted(timestamp): ... print(f"Media connection interrupted at {timestamp}") >>> client.on_media_connection_interrupted(on_interrupted) def onParticipantEvent(self, callback: Callable[[str, int, list], None]) ‑> bool-
Expand source code
def on_participant_event(self, callback: Callable[[str, int, list], None]) -> bool: """ Register a callback for participant join/leave events. This automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE. Events are delivered as parsed objects, not raw JSON. Args: callback: Function called with (event, timestamp, participants) where: - event: 'join' or 'leave' - timestamp: Unix timestamp in milliseconds - participants: List of dicts with 'user_id' and optional 'user_name' Returns: bool: True if registration succeeds Example: >>> def on_participant(event, timestamp, participants): ... print(f"Participant {event}: {participants}") >>> client.on_participant_event(on_participant) """ self._participant_event_callback = callback self._setup_event_handler() try: self.subscribe_event([EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to participant events: {e}') return TrueRegister a callback for participant join/leave events.
This automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE. Events are delivered as parsed objects, not raw JSON.
Args
callback- Function called with (event, timestamp, participants) where: - event: 'join' or 'leave' - timestamp: Unix timestamp in milliseconds - participants: List of dicts with 'user_id' and optional 'user_name'
Returns
bool- True if registration succeeds
Example
>>> def on_participant(event, timestamp, participants): ... print(f"Participant {event}: {participants}") >>> client.on_participant_event(on_participant) def onParticipantVideo(self, callback) ‑> None-
Expand source code
def on_participant_video(self, callback) -> None: """Register a callback for participant video state changes. The callback receives (users: list[int], is_on: bool). """ self._participant_video_callback = callback super().on_participant_video(callback)Register a callback for participant video state changes.
The callback receives (users: list[int], is_on: bool).
def onSharingEvent(self, callback: Callable[[str, int, int | None, str | None], None]) ‑> bool-
Expand source code
def on_sharing_event(self, callback: Callable[[str, int, Optional[int], Optional[str]], None]) -> bool: """ Register a callback for sharing start/stop events. This automatically subscribes to EVENT_SHARING_START and EVENT_SHARING_STOP. Note: These events only work when the RTMS app has DESKSHARE scope permission. Args: callback: Function called with (event, timestamp, user_id, user_name) where: - event: 'start' or 'stop' - timestamp: Unix timestamp in milliseconds - user_id: Optional user ID (only for 'start') - user_name: Optional user name (only for 'start') Returns: bool: True if registration succeeds Example: >>> def on_sharing(event, timestamp, user_id, user_name): ... print(f"Sharing {event} by {user_name}") >>> client.on_sharing_event(on_sharing) """ self._sharing_callback = callback self._setup_event_handler() try: self.subscribe_event([EVENT_SHARING_START, EVENT_SHARING_STOP]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to sharing events: {e}') return TrueRegister a callback for sharing start/stop events.
This automatically subscribes to EVENT_SHARING_START and EVENT_SHARING_STOP. Note: These events only work when the RTMS app has DESKSHARE scope permission.
Args
callback- Function called with (event, timestamp, user_id, user_name) where: - event: 'start' or 'stop' - timestamp: Unix timestamp in milliseconds - user_id: Optional user ID (only for 'start') - user_name: Optional user name (only for 'start')
Returns
bool- True if registration succeeds
Example
>>> def on_sharing(event, timestamp, user_id, user_name): ... print(f"Sharing {event} by {user_name}") >>> client.on_sharing_event(on_sharing) def onTranscriptData(self, callback) ‑> None-
Expand source code
def on_transcript_data(self, callback) -> None: """Register transcript data callback. Supports executor and async coroutines.""" super().on_transcript_data(self._wrap_callback(callback))Register transcript data callback. Supports executor and async coroutines.
def onVideoData(self, callback) ‑> None-
Expand source code
def on_video_data(self, callback) -> None: """Register video data callback. Supports executor and async coroutines.""" super().on_video_data(self._wrap_callback(callback))Register video data callback. Supports executor and async coroutines.
def onVideoSubscribed(self, callback) ‑> None-
Expand source code
def on_video_subscribed(self, callback) -> None: """Register a callback for video subscription responses. The callback receives (user_id: int, status: int, error: str). """ self._video_subscribed_callback = callback super().on_video_subscribed(callback)Register a callback for video subscription responses.
The callback receives (user_id: int, status: int, error: str).
def on_active_speaker_event(self, callback: Callable[[int, int, str], None]) ‑> bool-
Expand source code
def on_active_speaker_event(self, callback: Callable[[int, int, str], None]) -> bool: """ Register a callback for active speaker change events. This automatically subscribes to EVENT_ACTIVE_SPEAKER_CHANGE. Args: callback: Function called with (timestamp, user_id, user_name) Returns: bool: True if registration succeeds Example: >>> def on_speaker(timestamp, user_id, user_name): ... print(f"Active speaker: {user_name} ({user_id})") >>> client.on_active_speaker_event(on_speaker) """ self._active_speaker_callback = callback self._setup_event_handler() try: self.subscribe_event([EVENT_ACTIVE_SPEAKER_CHANGE]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to active speaker events: {e}') return TrueRegister a callback for active speaker change events.
This automatically subscribes to EVENT_ACTIVE_SPEAKER_CHANGE.
Args
callback- Function called with (timestamp, user_id, user_name)
Returns
bool- True if registration succeeds
Example
>>> def on_speaker(timestamp, user_id, user_name): ... print(f"Active speaker: {user_name} ({user_id})") >>> client.on_active_speaker_event(on_speaker) def on_audio_data(self, callback) ‑> None-
Expand source code
def on_audio_data(self, callback) -> None: """Register audio data callback. Supports executor and async coroutines.""" super().on_audio_data(self._wrap_callback(callback))Register audio data callback. Supports executor and async coroutines.
-
Expand source code
def on_deskshare_data(self, callback) -> None: """Register deskshare data callback. Supports executor and async coroutines.""" super().on_deskshare_data(self._wrap_callback(callback))Register deskshare data callback. Supports executor and async coroutines.
def on_event_ex(self, callback: Callable[[str], None]) ‑> bool-
Expand source code
def on_event_ex(self, callback: Callable[[str], None]) -> bool: """ Register a callback for raw event data. This provides access to the raw JSON event data from the SDK. Use this when you need custom event handling or access to all event types. This callback is called IN ADDITION to typed callbacks, not instead of. Args: callback: Function called with raw JSON event data string Returns: bool: True if registration succeeds """ self._raw_event_callback = callback self._setup_event_handler() return TrueRegister a callback for raw event data.
This provides access to the raw JSON event data from the SDK. Use this when you need custom event handling or access to all event types. This callback is called IN ADDITION to typed callbacks, not instead of.
Args
callback- Function called with raw JSON event data string
Returns
bool- True if registration succeeds
def on_media_connection_interrupted(self, callback: Callable[[int], None]) ‑> bool-
Expand source code
def on_media_connection_interrupted(self, callback: Callable[[int], None]) -> bool: """ Register a callback for media connection interrupted events. This automatically subscribes to EVENT_MEDIA_CONNECTION_INTERRUPTED. Args: callback: Function called with (timestamp,) when the media connection is interrupted Returns: bool: True if registration succeeds Example: >>> def on_interrupted(timestamp): ... print(f"Media connection interrupted at {timestamp}") >>> client.on_media_connection_interrupted(on_interrupted) """ self._media_interrupted_callback = callback self._setup_event_handler() try: self.subscribe_event([EVENT_MEDIA_CONNECTION_INTERRUPTED]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to media connection interrupted events: {e}') return TrueRegister a callback for media connection interrupted events.
This automatically subscribes to EVENT_MEDIA_CONNECTION_INTERRUPTED.
Args
callback- Function called with (timestamp,) when the media connection is interrupted
Returns
bool- True if registration succeeds
Example
>>> def on_interrupted(timestamp): ... print(f"Media connection interrupted at {timestamp}") >>> client.on_media_connection_interrupted(on_interrupted) def on_participant_event(self, callback: Callable[[str, int, list], None]) ‑> bool-
Expand source code
def on_participant_event(self, callback: Callable[[str, int, list], None]) -> bool: """ Register a callback for participant join/leave events. This automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE. Events are delivered as parsed objects, not raw JSON. Args: callback: Function called with (event, timestamp, participants) where: - event: 'join' or 'leave' - timestamp: Unix timestamp in milliseconds - participants: List of dicts with 'user_id' and optional 'user_name' Returns: bool: True if registration succeeds Example: >>> def on_participant(event, timestamp, participants): ... print(f"Participant {event}: {participants}") >>> client.on_participant_event(on_participant) """ self._participant_event_callback = callback self._setup_event_handler() try: self.subscribe_event([EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to participant events: {e}') return TrueRegister a callback for participant join/leave events.
This automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE. Events are delivered as parsed objects, not raw JSON.
Args
callback- Function called with (event, timestamp, participants) where: - event: 'join' or 'leave' - timestamp: Unix timestamp in milliseconds - participants: List of dicts with 'user_id' and optional 'user_name'
Returns
bool- True if registration succeeds
Example
>>> def on_participant(event, timestamp, participants): ... print(f"Participant {event}: {participants}") >>> client.on_participant_event(on_participant) def on_participant_video(self, callback) ‑> None-
Expand source code
def on_participant_video(self, callback) -> None: """Register a callback for participant video state changes. The callback receives (users: list[int], is_on: bool). """ self._participant_video_callback = callback super().on_participant_video(callback)Register a callback for participant video state changes.
The callback receives (users: list[int], is_on: bool).
def on_sharing_event(self, callback: Callable[[str, int, int | None, str | None], None]) ‑> bool-
Expand source code
def on_sharing_event(self, callback: Callable[[str, int, Optional[int], Optional[str]], None]) -> bool: """ Register a callback for sharing start/stop events. This automatically subscribes to EVENT_SHARING_START and EVENT_SHARING_STOP. Note: These events only work when the RTMS app has DESKSHARE scope permission. Args: callback: Function called with (event, timestamp, user_id, user_name) where: - event: 'start' or 'stop' - timestamp: Unix timestamp in milliseconds - user_id: Optional user ID (only for 'start') - user_name: Optional user name (only for 'start') Returns: bool: True if registration succeeds Example: >>> def on_sharing(event, timestamp, user_id, user_name): ... print(f"Sharing {event} by {user_name}") >>> client.on_sharing_event(on_sharing) """ self._sharing_callback = callback self._setup_event_handler() try: self.subscribe_event([EVENT_SHARING_START, EVENT_SHARING_STOP]) except Exception as e: log_warn('client', f'Failed to auto-subscribe to sharing events: {e}') return TrueRegister a callback for sharing start/stop events.
This automatically subscribes to EVENT_SHARING_START and EVENT_SHARING_STOP. Note: These events only work when the RTMS app has DESKSHARE scope permission.
Args
callback- Function called with (event, timestamp, user_id, user_name) where: - event: 'start' or 'stop' - timestamp: Unix timestamp in milliseconds - user_id: Optional user ID (only for 'start') - user_name: Optional user name (only for 'start')
Returns
bool- True if registration succeeds
Example
>>> def on_sharing(event, timestamp, user_id, user_name): ... print(f"Sharing {event} by {user_name}") >>> client.on_sharing_event(on_sharing) def on_transcript_data(self, callback) ‑> None-
Expand source code
def on_transcript_data(self, callback) -> None: """Register transcript data callback. Supports executor and async coroutines.""" super().on_transcript_data(self._wrap_callback(callback))Register transcript data callback. Supports executor and async coroutines.
def on_video_data(self, callback) ‑> None-
Expand source code
def on_video_data(self, callback) -> None: """Register video data callback. Supports executor and async coroutines.""" super().on_video_data(self._wrap_callback(callback))Register video data callback. Supports executor and async coroutines.
def on_video_subscribed(self, callback) ‑> None-
Expand source code
def on_video_subscribed(self, callback) -> None: """Register a callback for video subscription responses. The callback receives (user_id: int, status: int, error: str). """ self._video_subscribed_callback = callback super().on_video_subscribed(callback)Register a callback for video subscription responses.
The callback receives (user_id: int, status: int, error: str).
def on_webhook_event(self, callback=None, port=None, path=None)-
Expand source code
def on_webhook_event(self, callback=None, port=None, path=None): """ Register a webhook event handler. This can be used as a decorator or a direct method call: @client.on_webhook_event(port=8080, path='/webhook') def handle_webhook(payload): print(f"Received webhook: {payload}") Args: callback (callable, optional): Function to call when a webhook is received port (int, optional): Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080 path (str, optional): URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/' Returns: callable: Decorator function if used as a decorator """ # If used as a decorator without arguments if callback is not None and callable(callback): # Start webhook server with provided callback self._start_webhook_server(callback) return callback # If used as a decorator with arguments or as a method call def decorator(func): self._start_webhook_server(func, port, path) return func return decoratorRegister a webhook event handler.
This can be used as a decorator or a direct method call:
@client.on_webhook_event(port=8080, path='/webhook') def handle_webhook(payload): print(f"Received webhook: {payload}")
Args
callback:callable, optional- Function to call when a webhook is received
port:int, optional- Port to listen on. Defaults to ZM_RTMS_PORT env var or 8080
path:str, optional- URL path to listen on. Defaults to ZM_RTMS_PATH env var or '/'
Returns
callable- Decorator function if used as a decorator
def poll(self)-
Expand source code
def poll(self): """Poll the C SDK for pending events. Called by the owning EventLoop's thread.""" if self._running: try: super().poll() except Exception as e: log_error("client", f"Error during polling: {e}")Poll the C SDK for pending events. Called by the owning EventLoop's thread.
def setAudioParams(self, params)-
Expand source code
def set_audio_params(self, params): """ Set audio parameters with validation. Args: params (AudioParams): Audio parameters object Returns: bool: True if parameters were set successfully Raises: ValueError: If parameters are invalid """ _validate_audio_params(params) return super().set_audio_params(params)Set audio parameters with validation.
Args
params:AudioParams- Audio parameters object
Returns
bool- True if parameters were set successfully
Raises
ValueError- If parameters are invalid
-
Expand source code
def set_deskshare_params(self, params): """ Set deskshare parameters with validation. Args: params (DeskshareParams): Deskshare parameters object Returns: bool: True if parameters were set successfully Raises: ValueError: If parameters are invalid """ _validate_deskshare_params(params) return super().set_deskshare_params(params)Set deskshare parameters with validation.
Args
params:DeskshareParams- Deskshare parameters object
Returns
bool- True if parameters were set successfully
Raises
ValueError- If parameters are invalid
def setProxy(self, proxy_type: str, proxy_url: str) ‑> None-
Expand source code
def set_proxy(self, proxy_type: str, proxy_url: str) -> None: """Configure a proxy for SDK connections. Args: proxy_type (str): Proxy protocol type (e.g. 'http', 'https'). proxy_url (str): Full proxy URL including host and port. """ return super().set_proxy(proxy_type, proxy_url)Configure a proxy for SDK connections.
Args
proxy_type:str- Proxy protocol type (e.g. 'http', 'https').
proxy_url:str- Full proxy URL including host and port.
def setTranscriptParams(self, params)-
Expand source code
def set_transcript_params(self, params): """ Set transcript parameters. Args: params (TranscriptParams): Transcript parameters object Returns: bool: True if parameters were set successfully """ return super().set_transcript_params(params)Set transcript parameters.
Args
params:TranscriptParams- Transcript parameters object
Returns
bool- True if parameters were set successfully
def setVideoParams(self, params)-
Expand source code
def set_video_params(self, params): """ Set video parameters with validation. Args: params (VideoParams): Video parameters object Returns: bool: True if parameters were set successfully Raises: ValueError: If parameters are invalid """ _validate_video_params(params) return super().set_video_params(params)Set video parameters with validation.
Args
params:VideoParams- Video parameters object
Returns
bool- True if parameters were set successfully
Raises
ValueError- If parameters are invalid
def set_audio_params(self, params)-
Expand source code
def set_audio_params(self, params): """ Set audio parameters with validation. Args: params (AudioParams): Audio parameters object Returns: bool: True if parameters were set successfully Raises: ValueError: If parameters are invalid """ _validate_audio_params(params) return super().set_audio_params(params)Set audio parameters with validation.
Args
params:AudioParams- Audio parameters object
Returns
bool- True if parameters were set successfully
Raises
ValueError- If parameters are invalid
-
Expand source code
def set_deskshare_params(self, params): """ Set deskshare parameters with validation. Args: params (DeskshareParams): Deskshare parameters object Returns: bool: True if parameters were set successfully Raises: ValueError: If parameters are invalid """ _validate_deskshare_params(params) return super().set_deskshare_params(params)Set deskshare parameters with validation.
Args
params:DeskshareParams- Deskshare parameters object
Returns
bool- True if parameters were set successfully
Raises
ValueError- If parameters are invalid
def set_proxy(self, proxy_type: str, proxy_url: str) ‑> None-
Expand source code
def set_proxy(self, proxy_type: str, proxy_url: str) -> None: """Configure a proxy for SDK connections. Args: proxy_type (str): Proxy protocol type (e.g. 'http', 'https'). proxy_url (str): Full proxy URL including host and port. """ return super().set_proxy(proxy_type, proxy_url)Configure a proxy for SDK connections.
Args
proxy_type:str- Proxy protocol type (e.g. 'http', 'https').
proxy_url:str- Full proxy URL including host and port.
def set_transcript_params(self, params)-
Expand source code
def set_transcript_params(self, params): """ Set transcript parameters. Args: params (TranscriptParams): Transcript parameters object Returns: bool: True if parameters were set successfully """ return super().set_transcript_params(params)Set transcript parameters.
Args
params:TranscriptParams- Transcript parameters object
Returns
bool- True if parameters were set successfully
def set_video_params(self, params)-
Expand source code
def set_video_params(self, params): """ Set video parameters with validation. Args: params (VideoParams): Video parameters object Returns: bool: True if parameters were set successfully Raises: ValueError: If parameters are invalid """ _validate_video_params(params) return super().set_video_params(params)Set video parameters with validation.
Args
params:VideoParams- Video parameters object
Returns
bool- True if parameters were set successfully
Raises
ValueError- If parameters are invalid
def stop(self)-
Expand source code
def stop(self): """ Stop RTMS client and release resources. This is equivalent to calling leave(), but with a more intuitive name. """ return self.leave()Stop RTMS client and release resources.
This is equivalent to calling leave(), but with a more intuitive name.
def subscribeEvent(self, events)-
Expand source code
def subscribe_event(self, events): """ Subscribe to receive specific event types. Note: Calling on_participant_event() automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE events. Args: events: List of event type constants (e.g., [EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE]) Returns: bool: True if subscription was successful """ return super().subscribe_event(events)Subscribe to receive specific event types.
Note: Calling on_participant_event() automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE events.
Args
events- List of event type constants (e.g., [EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE])
Returns
bool- True if subscription was successful
def subscribeVideo(self, user_id: int, subscribe: bool) ‑> None-
Expand source code
def subscribe_video(self, user_id: int, subscribe: bool) -> None: """Subscribe or unsubscribe from an individual participant's video stream. Args: user_id (int): The participant's user ID. subscribe (bool): True to subscribe, False to unsubscribe. """ return super().subscribe_video(user_id, subscribe)Subscribe or unsubscribe from an individual participant's video stream.
Args
user_id:int- The participant's user ID.
subscribe:bool- True to subscribe, False to unsubscribe.
def subscribe_event(self, events)-
Expand source code
def subscribe_event(self, events): """ Subscribe to receive specific event types. Note: Calling on_participant_event() automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE events. Args: events: List of event type constants (e.g., [EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE]) Returns: bool: True if subscription was successful """ return super().subscribe_event(events)Subscribe to receive specific event types.
Note: Calling on_participant_event() automatically subscribes to EVENT_PARTICIPANT_JOIN and EVENT_PARTICIPANT_LEAVE events.
Args
events- List of event type constants (e.g., [EVENT_PARTICIPANT_JOIN, EVENT_PARTICIPANT_LEAVE])
Returns
bool- True if subscription was successful
def subscribe_video(self, user_id: int, subscribe: bool) ‑> None-
Expand source code
def subscribe_video(self, user_id: int, subscribe: bool) -> None: """Subscribe or unsubscribe from an individual participant's video stream. Args: user_id (int): The participant's user ID. subscribe (bool): True to subscribe, False to unsubscribe. """ return super().subscribe_video(user_id, subscribe)Subscribe or unsubscribe from an individual participant's video stream.
Args
user_id:int- The participant's user ID.
subscribe:bool- True to subscribe, False to unsubscribe.
def unsubscribeEvent(self, events)-
Expand source code
def unsubscribe_event(self, events): """ Unsubscribe from specific event types. Args: events: List of event type constants Returns: bool: True if unsubscription was successful """ return super().unsubscribe_event(events)Unsubscribe from specific event types.
Args
events- List of event type constants
Returns
bool- True if unsubscription was successful
def unsubscribe_event(self, events)-
Expand source code
def unsubscribe_event(self, events): """ Unsubscribe from specific event types. Args: events: List of event type constants Returns: bool: True if unsubscription was successful """ return super().unsubscribe_event(events)Unsubscribe from specific event types.
Args
events- List of event type constants
Returns
bool- True if unsubscription was successful
class DataOption (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var AUDIO_MIXED_STREAM-
The type of the None singleton.
var AUDIO_MULTI_STREAMS-
The type of the None singleton.
var UNDEFINED-
The type of the None singleton.
var VIDEO_MIXED_GALLERY_VIEW-
The type of the None singleton.
var VIDEO_MIXED_SPEAKER_VIEW-
The type of the None singleton.
var VIDEO_SINGLE_ACTIVE_STREAM-
The type of the None singleton.
var VIDEO_SINGLE_INDIVIDUAL_STREAM-
The type of the None singleton.
class AudioDataOption (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var AUDIO_MIXED_STREAM-
The type of the None singleton.
var AUDIO_MULTI_STREAMS-
The type of the None singleton.
var UNDEFINED-
The type of the None singleton.
var VIDEO_MIXED_GALLERY_VIEW-
The type of the None singleton.
var VIDEO_MIXED_SPEAKER_VIEW-
The type of the None singleton.
var VIDEO_SINGLE_ACTIVE_STREAM-
The type of the None singleton.
var VIDEO_SINGLE_INDIVIDUAL_STREAM-
The type of the None singleton.
class VideoDataOption (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var AUDIO_MIXED_STREAM-
The type of the None singleton.
var AUDIO_MULTI_STREAMS-
The type of the None singleton.
var UNDEFINED-
The type of the None singleton.
var VIDEO_MIXED_GALLERY_VIEW-
The type of the None singleton.
var VIDEO_MIXED_SPEAKER_VIEW-
The type of the None singleton.
var VIDEO_SINGLE_ACTIVE_STREAM-
The type of the None singleton.
var VIDEO_SINGLE_INDIVIDUAL_STREAM-
The type of the None singleton.
-
init(args, *kwargs) Overloaded function.
-
init(self: rtms._rtms.DeskshareParams) -> None
-
init(self: rtms._rtms.DeskshareParams, content_type: typing.SupportsInt | typing.SupportsIndex, codec: typing.SupportsInt | typing.SupportsIndex, resolution: typing.SupportsInt | typing.SupportsIndex, fps: typing.SupportsInt | typing.SupportsIndex) -> None
Ancestors
- pybind11_builtins.pybind11_object
Instance variables
-
class EventLoop (poll_interval: float = 0.01, name: str = None)-
Expand source code
class EventLoop: """ An SDK I/O thread that owns one or more Client lifecycles. The Zoom C SDK requires that alloc(), join(), poll(), and release() all run on the same OS thread. EventLoop is that thread. Clients assigned to a loop via add() will have their entire lifecycle managed on the loop's thread. Usage:: loop = rtms.EventLoop() @rtms.on_webhook_event def handle(payload): client = rtms.Client(executor=EXECUTOR) client.on_audio_data(on_audio) loop.add(client) client.join(payload['payload']) await loop.run_async() # or loop.run() to block Callbacks are dispatched according to the executor set on each Client: - No executor: callback runs inline on the loop's thread (simple, low latency) - executor=ThreadPoolExecutor(...): heavy work offloaded to worker pool - async def callback: bridged to the asyncio event loop via run_coroutine_threadsafe """ def __init__(self, poll_interval: float = 0.01, name: str = None): """ Args: poll_interval: Seconds between poll cycles (default: 0.01 = 10ms) name: Optional thread name for debugging """ self._poll_interval = poll_interval self._name = name self._clients: List['Client'] = [] self._clients_lock = threading.Lock() self._pending: List['Client'] = [] # clients waiting for alloc+join on this thread self._pending_lock = threading.Lock() self._running = False self._stop_event = threading.Event() self._thread: Optional[threading.Thread] = None @property def client_count(self) -> int: """Number of clients currently owned by this loop.""" with self._clients_lock: return len(self._clients) def add(self, client: 'Client') -> None: """ Assign a client to this loop's thread. Must be called before client.join(). The loop's thread will call alloc() and join() on behalf of the client. Args: client: A Client whose join() will be deferred to this loop's thread """ client._assigned_loop = self with self._pending_lock: self._pending.append(client) def _drain_pending(self) -> None: """Called from the loop's thread — alloc and join all waiting clients.""" with self._pending_lock: pending = self._pending[:] self._pending.clear() for client in pending: try: client._do_alloc_and_join() with self._clients_lock: self._clients.append(client) except Exception as e: log_error('eventloop', f'Failed to alloc/join client: {e}') traceback.print_exc() def _poll_all(self) -> None: """Poll all active clients. Removes clients that have left.""" with self._clients_lock: active = self._clients[:] to_remove = [] for client in active: if client._running: try: client.poll() except Exception as e: log_error('eventloop', f'Error polling client: {e}') to_remove.append(client) else: to_remove.append(client) if to_remove: with self._clients_lock: for c in to_remove: self._clients.discard(c) if hasattr(self._clients, 'discard') else None with self._clients_lock: self._clients = [c for c in self._clients if c not in to_remove] def run(self, stop_on_empty: bool = False) -> None: """ Run the event loop on the current thread (blocking). The current thread becomes the SDK I/O thread. Use this when you want explicit control of which thread drives the loop. Args: stop_on_empty: Stop automatically when all clients have left """ self._running = True self._stop_event.clear() log_info('eventloop', f'Starting event loop{" (" + self._name + ")" if self._name else ""} ' f'(poll_interval={self._poll_interval}s)') try: while self._running and not self._stop_event.is_set(): self._drain_pending() self._poll_all() if stop_on_empty: with self._clients_lock: if not self._clients and not self._pending: break time.sleep(self._poll_interval) except KeyboardInterrupt: pass finally: self._running = False log_debug('eventloop', 'Event loop stopped') async def run_async(self, stop_on_empty: bool = False) -> None: """ Run the event loop as an asyncio coroutine. Yields to the asyncio event loop between poll cycles so other coroutines (aiohttp, FastAPI, asyncpg, etc.) run freely. Async callbacks registered on clients are automatically bridged to this event loop. Args: stop_on_empty: Stop automatically when all clients have left Example:: async def main(): loop = rtms.EventLoop() await asyncio.gather(loop.run_async(), aiohttp_app.start()) asyncio.run(main()) """ self._running = True self._stop_event.clear() log_info('eventloop', f'Starting async event loop{" (" + self._name + ")" if self._name else ""} ' f'(poll_interval={self._poll_interval}s)') try: while self._running and not self._stop_event.is_set(): self._drain_pending() self._poll_all() if stop_on_empty: with self._clients_lock: if not self._clients and not self._pending: break await asyncio.sleep(self._poll_interval) except asyncio.CancelledError: log_info('eventloop', 'Async event loop cancelled') finally: self._running = False log_debug('eventloop', 'Async event loop stopped') def start(self) -> 'EventLoop': """ Start the event loop in a background daemon thread. Returns self for chaining:: loop = rtms.EventLoop().start() The thread runs until stop() is called or the process exits. """ self._thread = threading.Thread( target=self.run, name=self._name or 'rtms-eventloop', daemon=True, ) self._thread.start() log_debug('eventloop', f'Background thread started: {self._thread.name}') return self def stop(self) -> None: """Signal the event loop to stop after the current poll cycle.""" self._running = False self._stop_event.set() def join(self, timeout: float = None) -> None: """Wait for the background thread to finish (only valid after start()).""" if self._thread: self._thread.join(timeout=timeout)An SDK I/O thread that owns one or more Client lifecycles.
The Zoom C SDK requires that alloc(), join(), poll(), and release() all run on the same OS thread. EventLoop is that thread. Clients assigned to a loop via add() will have their entire lifecycle managed on the loop's thread.
Usage::
loop = rtms.EventLoop() @rtms.on_webhook_event def handle(payload): client = rtms.Client(executor=EXECUTOR) client.on_audio_data(on_audio) loop.add(client) client.join(payload['payload']) await loop.run_async() # or loop.run() to blockCallbacks are dispatched according to the executor set on each Client: - No executor: callback runs inline on the loop's thread (simple, low latency) - executor=ThreadPoolExecutor(…): heavy work offloaded to worker pool - async def callback: bridged to the asyncio event loop via run_coroutine_threadsafe
Args
poll_interval- Seconds between poll cycles (default: 0.01 = 10ms)
name- Optional thread name for debugging
Instance variables
prop client_count : int-
Expand source code
@property def client_count(self) -> int: """Number of clients currently owned by this loop.""" with self._clients_lock: return len(self._clients)Number of clients currently owned by this loop.
Methods
def add(self, client: Client) ‑> None-
Expand source code
def add(self, client: 'Client') -> None: """ Assign a client to this loop's thread. Must be called before client.join(). The loop's thread will call alloc() and join() on behalf of the client. Args: client: A Client whose join() will be deferred to this loop's thread """ client._assigned_loop = self with self._pending_lock: self._pending.append(client)Assign a client to this loop's thread.
Must be called before client.join(). The loop's thread will call alloc() and join() on behalf of the client.
Args
client- A Client whose join() will be deferred to this loop's thread
def join(self, timeout: float = None) ‑> None-
Expand source code
def join(self, timeout: float = None) -> None: """Wait for the background thread to finish (only valid after start()).""" if self._thread: self._thread.join(timeout=timeout)Wait for the background thread to finish (only valid after start()).
def run(self, stop_on_empty: bool = False) ‑> None-
Expand source code
def run(self, stop_on_empty: bool = False) -> None: """ Run the event loop on the current thread (blocking). The current thread becomes the SDK I/O thread. Use this when you want explicit control of which thread drives the loop. Args: stop_on_empty: Stop automatically when all clients have left """ self._running = True self._stop_event.clear() log_info('eventloop', f'Starting event loop{" (" + self._name + ")" if self._name else ""} ' f'(poll_interval={self._poll_interval}s)') try: while self._running and not self._stop_event.is_set(): self._drain_pending() self._poll_all() if stop_on_empty: with self._clients_lock: if not self._clients and not self._pending: break time.sleep(self._poll_interval) except KeyboardInterrupt: pass finally: self._running = False log_debug('eventloop', 'Event loop stopped')Run the event loop on the current thread (blocking).
The current thread becomes the SDK I/O thread. Use this when you want explicit control of which thread drives the loop.
Args
stop_on_empty- Stop automatically when all clients have left
async def run_async(self, stop_on_empty: bool = False) ‑> None-
Expand source code
async def run_async(self, stop_on_empty: bool = False) -> None: """ Run the event loop as an asyncio coroutine. Yields to the asyncio event loop between poll cycles so other coroutines (aiohttp, FastAPI, asyncpg, etc.) run freely. Async callbacks registered on clients are automatically bridged to this event loop. Args: stop_on_empty: Stop automatically when all clients have left Example:: async def main(): loop = rtms.EventLoop() await asyncio.gather(loop.run_async(), aiohttp_app.start()) asyncio.run(main()) """ self._running = True self._stop_event.clear() log_info('eventloop', f'Starting async event loop{" (" + self._name + ")" if self._name else ""} ' f'(poll_interval={self._poll_interval}s)') try: while self._running and not self._stop_event.is_set(): self._drain_pending() self._poll_all() if stop_on_empty: with self._clients_lock: if not self._clients and not self._pending: break await asyncio.sleep(self._poll_interval) except asyncio.CancelledError: log_info('eventloop', 'Async event loop cancelled') finally: self._running = False log_debug('eventloop', 'Async event loop stopped')Run the event loop as an asyncio coroutine.
Yields to the asyncio event loop between poll cycles so other coroutines (aiohttp, FastAPI, asyncpg, etc.) run freely. Async callbacks registered on clients are automatically bridged to this event loop.
Args
stop_on_empty- Stop automatically when all clients have left
Example::
async def main(): loop = rtms.EventLoop() await asyncio.gather(loop.run_async(), aiohttp_app.start()) asyncio.run(main()) def start(self) ‑> EventLoop-
Expand source code
def start(self) -> 'EventLoop': """ Start the event loop in a background daemon thread. Returns self for chaining:: loop = rtms.EventLoop().start() The thread runs until stop() is called or the process exits. """ self._thread = threading.Thread( target=self.run, name=self._name or 'rtms-eventloop', daemon=True, ) self._thread.start() log_debug('eventloop', f'Background thread started: {self._thread.name}') return selfStart the event loop in a background daemon thread.
Returns self for chaining::
loop = rtms.EventLoop().start()The thread runs until stop() is called or the process exits.
def stop(self) ‑> None-
Expand source code
def stop(self) -> None: """Signal the event loop to stop after the current poll cycle.""" self._running = False self._stop_event.set()Signal the event loop to stop after the current poll cycle.
class EventLoopPool (threads: int = 4, poll_interval: float = 0.01, strategy: str = 'least_loaded')-
Expand source code
class EventLoopPool: """ A pool of EventLoop threads that distributes clients across N SDK I/O threads. Use this for high-concurrency deployments where many clients share a fixed number of threads. Each client is permanently assigned to one loop for its entire lifetime. Usage:: pool = rtms.EventLoopPool(threads=4) @rtms.on_webhook_event def handle(payload): client = rtms.Client(executor=EXECUTOR) client.on_audio_data(on_audio) pool.add(client) # routed to least-loaded loop client.join(payload['payload']) await pool.run_async() # or pool.run() Scaling guidance: - 1 thread per ~25 clients is a reasonable starting point - Use executor= on Client for CPU/IO-heavy callbacks - Monitor loop.client_count to tune thread count """ def __init__( self, threads: int = 4, poll_interval: float = 0.01, strategy: str = 'least_loaded', ): """ Args: threads: Number of SDK I/O threads (default: 4) poll_interval: Seconds between poll cycles per loop (default: 0.01) strategy: Client routing strategy — 'least_loaded' or 'round_robin' """ if threads < 1: raise ValueError("threads must be >= 1") if strategy not in ('least_loaded', 'round_robin'): raise ValueError("strategy must be 'least_loaded' or 'round_robin'") self._loops = [ EventLoop(poll_interval=poll_interval, name=f'rtms-pool-{i}') for i in range(threads) ] self._strategy = strategy self._rr_index = 0 self._rr_lock = threading.Lock() @property def loops(self) -> List[EventLoop]: """The underlying EventLoop list.""" return self._loops @property def client_count(self) -> int: """Total clients across all loops.""" return sum(l.client_count for l in self._loops) def add(self, client: 'Client') -> EventLoop: """ Assign a client to a loop according to the routing strategy. Returns the EventLoop the client was assigned to. """ if self._strategy == 'least_loaded': loop = min(self._loops, key=lambda l: l.client_count) else: # round_robin with self._rr_lock: loop = self._loops[self._rr_index % len(self._loops)] self._rr_index += 1 loop.add(client) return loop def run(self, stop_on_empty: bool = False) -> None: """ Run all loops. Starts N-1 loops as background daemon threads and runs the last one on the current thread (blocking). """ for loop in self._loops[:-1]: loop.start() self._loops[-1].run(stop_on_empty=stop_on_empty) async def run_async(self, stop_on_empty: bool = False) -> None: """ Run all loops as asyncio coroutines concurrently. """ await asyncio.gather(*[l.run_async(stop_on_empty=stop_on_empty) for l in self._loops]) def stop(self) -> None: """Stop all loops.""" for loop in self._loops: loop.stop()A pool of EventLoop threads that distributes clients across N SDK I/O threads.
Use this for high-concurrency deployments where many clients share a fixed number of threads. Each client is permanently assigned to one loop for its entire lifetime.
Usage::
pool = rtms.EventLoopPool(threads=4) @rtms.on_webhook_event def handle(payload): client = rtms.Client(executor=EXECUTOR) client.on_audio_data(on_audio) pool.add(client) # routed to least-loaded loop client.join(payload['payload']) await pool.run_async() # or pool.run()Scaling guidance: - 1 thread per ~25 clients is a reasonable starting point - Use executor= on Client for CPU/IO-heavy callbacks - Monitor loop.client_count to tune thread count
Args
threads- Number of SDK I/O threads (default: 4)
poll_interval- Seconds between poll cycles per loop (default: 0.01)
strategy- Client routing strategy — 'least_loaded' or 'round_robin'
Instance variables
prop client_count : int-
Expand source code
@property def client_count(self) -> int: """Total clients across all loops.""" return sum(l.client_count for l in self._loops)Total clients across all loops.
prop loops : List[EventLoop]-
Expand source code
@property def loops(self) -> List[EventLoop]: """The underlying EventLoop list.""" return self._loopsThe underlying EventLoop list.
Methods
def add(self, client: Client) ‑> EventLoop-
Expand source code
def add(self, client: 'Client') -> EventLoop: """ Assign a client to a loop according to the routing strategy. Returns the EventLoop the client was assigned to. """ if self._strategy == 'least_loaded': loop = min(self._loops, key=lambda l: l.client_count) else: # round_robin with self._rr_lock: loop = self._loops[self._rr_index % len(self._loops)] self._rr_index += 1 loop.add(client) return loopAssign a client to a loop according to the routing strategy.
Returns the EventLoop the client was assigned to.
def run(self, stop_on_empty: bool = False) ‑> None-
Expand source code
def run(self, stop_on_empty: bool = False) -> None: """ Run all loops. Starts N-1 loops as background daemon threads and runs the last one on the current thread (blocking). """ for loop in self._loops[:-1]: loop.start() self._loops[-1].run(stop_on_empty=stop_on_empty)Run all loops. Starts N-1 loops as background daemon threads and runs the last one on the current thread (blocking).
async def run_async(self, stop_on_empty: bool = False) ‑> None-
Expand source code
async def run_async(self, stop_on_empty: bool = False) -> None: """ Run all loops as asyncio coroutines concurrently. """ await asyncio.gather(*[l.run_async(stop_on_empty=stop_on_empty) for l in self._loops])Run all loops as asyncio coroutines concurrently.
def stop(self) ‑> None-
Expand source code
def stop(self) -> None: """Stop all loops.""" for loop in self._loops: loop.stop()Stop all loops.
class EventType (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var ACTIVE_SPEAKER_CHANGE-
The type of the None singleton.
var CONSUMER_ANSWERED-
The type of the None singleton.
var CONSUMER_END-
The type of the None singleton.
var FIRST_PACKET_TIMESTAMP-
The type of the None singleton.
var MEDIA_CONNECTION_INTERRUPTED-
The type of the None singleton.
var PARTICIPANT_JOIN-
The type of the None singleton.
var PARTICIPANT_LEAVE-
The type of the None singleton.
var PARTICIPANT_VIDEO_OFF-
The type of the None singleton.
var PARTICIPANT_VIDEO_ON-
The type of the None singleton.
var SHARING_START-
The type of the None singleton.
var SHARING_STOP-
The type of the None singleton.
var UNDEFINED-
The type of the None singleton.
var USER_ANSWERED-
The type of the None singleton.
var USER_END-
The type of the None singleton.
var USER_HOLD-
The type of the None singleton.
var USER_UNHOLD-
The type of the None singleton.
class LogFormat (*args, **kwds)-
Expand source code
class LogFormat(str, Enum): """Available log output formats""" PROGRESSIVE = 'progressive' JSON = 'json'Available log output formats
Ancestors
- builtins.str
- enum.Enum
Class variables
var JSON-
The type of the None singleton.
var PROGRESSIVE-
The type of the None singleton.
class LogLevel (*args, **kwds)-
Expand source code
class LogLevel(IntEnum): """Available log levels for RTMS SDK logging""" ERROR = 0 WARN = 1 INFO = 2 DEBUG = 3 TRACE = 4Available log levels for RTMS SDK logging
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var DEBUG-
The type of the None singleton.
var ERROR-
The type of the None singleton.
var INFO-
The type of the None singleton.
var TRACE-
The type of the None singleton.
var WARN-
The type of the None singleton.
class MediaDataType (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var ALL-
The type of the None singleton.
var AUDIO-
The type of the None singleton.
var CHAT-
The type of the None singleton.
var DESKSHARE-
The type of the None singleton.
var TRANSCRIPT-
The type of the None singleton.
var UNDEFINED-
The type of the None singleton.
var VIDEO-
The type of the None singleton.
class MessageType (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var CLIENT_READY_ACK-
The type of the None singleton.
var DATA_HAND_SHAKE_REQ-
The type of the None singleton.
var DATA_HAND_SHAKE_RESP-
The type of the None singleton.
var EVENT_SUBSCRIPTION-
The type of the None singleton.
var EVENT_UPDATE-
The type of the None singleton.
var KEEP_ALIVE_REQ-
The type of the None singleton.
var KEEP_ALIVE_RESP-
The type of the None singleton.
var MEDIA_DATA_AUDIO-
The type of the None singleton.
var MEDIA_DATA_CHAT-
The type of the None singleton.
var MEDIA_DATA_SHARE-
The type of the None singleton.
var MEDIA_DATA_TRANSCRIPT-
The type of the None singleton.
var MEDIA_DATA_VIDEO-
The type of the None singleton.
var META_DATA_AUDIO-
The type of the None singleton.
var META_DATA_CHAT-
The type of the None singleton.
var META_DATA_SHARE-
The type of the None singleton.
var META_DATA_TRANSCRIPT-
The type of the None singleton.
var META_DATA_VIDEO-
The type of the None singleton.
var SESSION_STATE_REQ-
The type of the None singleton.
var SESSION_STATE_RESP-
The type of the None singleton.
var SESSION_STATE_UPDATE-
The type of the None singleton.
var SIGNALING_HAND_SHAKE_REQ-
The type of the None singleton.
var SIGNALING_HAND_SHAKE_RESP-
The type of the None singleton.
var STREAM_CLOSE_REQ-
The type of the None singleton.
var STREAM_CLOSE_RESP-
The type of the None singleton.
var STREAM_STATE_REQ-
The type of the None singleton.
var STREAM_STATE_RESP-
The type of the None singleton.
var STREAM_STATE_UPDATE-
The type of the None singleton.
var UNDEFINED-
The type of the None singleton.
var VIDEO_SUBSCRIPTION_REQ-
The type of the None singleton.
var VIDEO_SUBSCRIPTION_RESP-
The type of the None singleton.
class Metadata (*args, **kwargs)-
Ancestors
- pybind11_builtins.pybind11_object
Instance variables
prop aiInterpreterprop endTsprop startTsprop userIdprop userName
class Participant (*args, **kwargs)-
Ancestors
- pybind11_builtins.pybind11_object
Instance variables
prop idprop name
class Session (*args, **kwargs)-
Ancestors
- pybind11_builtins.pybind11_object
Instance variables
prop isActiveprop isPausedprop meetingIdprop sessionIdprop statTimeprop statusprop streamId
class SessionState (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var INACTIVE-
The type of the None singleton.
var INITIALIZE-
The type of the None singleton.
var PAUSED-
The type of the None singleton.
var RESUMED-
The type of the None singleton.
var STARTED-
The type of the None singleton.
var STOPPED-
The type of the None singleton.
class StopReason (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var ADMIN_DISABLED_APP-
The type of the None singleton.
var AGENT_DISCONNECTED-
The type of the None singleton.
var ALL_APPS_DISABLED-
The type of the None singleton.
var AUTHENTICATION_FAILURE-
The type of the None singleton.
var AWAIT_RECONNECTION_TIMEOUT-
The type of the None singleton.
var CONNECTION_TIMEOUT-
The type of the None singleton.
var CUSTOMER_DISCONNECTED-
The type of the None singleton.
var DATA_CONNECTION_CLOSED_ABNORMALLY-
The type of the None singleton.
var DATA_CONNECTION_INTERRUPTED-
The type of the None singleton.
var EXIT_SIGNAL-
The type of the None singleton.
var HOST_DISABLED_APP-
The type of the None singleton.
var HOST_TRIGGERED-
The type of the None singleton.
var INSTANCE_CONNECTION_INTERRUPTED-
The type of the None singleton.
var INTERNAL_EXCEPTION-
The type of the None singleton.
var KEEP_ALIVE_TIMEOUT-
The type of the None singleton.
var MANUAL_API_TRIGGERED-
The type of the None singleton.
var MEETING_ENDED-
The type of the None singleton.
var RECEIVER_REQUEST_CLOSE-
The type of the None singleton.
var SIGNAL_CONNECTION_CLOSED_ABNORMALLY-
The type of the None singleton.
var SIGNAL_CONNECTION_INTERRUPTED-
The type of the None singleton.
var STREAMING_NOT_SUPPORTED-
The type of the None singleton.
var STREAM_CANCELED-
The type of the None singleton.
var STREAM_REVOKED-
The type of the None singleton.
var UNDEFINED-
The type of the None singleton.
var USER_EJECTED-
The type of the None singleton.
var USER_LEFT-
The type of the None singleton.
var USER_TRIGGERED-
The type of the None singleton.
class StreamState (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var ACTIVE-
The type of the None singleton.
var INACTIVE-
The type of the None singleton.
var INTERRUPTED-
The type of the None singleton.
var PAUSED-
The type of the None singleton.
var RESUMED-
The type of the None singleton.
var TERMINATED-
The type of the None singleton.
var TERMINATING-
The type of the None singleton.
class TranscriptLanguage (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var ARABIC-
The type of the None singleton.
var BENGALI-
The type of the None singleton.
var CANTONESE-
The type of the None singleton.
var CATALAN-
The type of the None singleton.
var CHINESE_SIMPLIFIED-
The type of the None singleton.
var CHINESE_TRADITIONAL-
The type of the None singleton.
var CZECH-
The type of the None singleton.
var DANISH-
The type of the None singleton.
var DUTCH-
The type of the None singleton.
var ENGLISH-
The type of the None singleton.
var ESTONIAN-
The type of the None singleton.
var FINNISH-
The type of the None singleton.
var FRENCH_CANADA-
The type of the None singleton.
var FRENCH_FRANCE-
The type of the None singleton.
var GERMAN-
The type of the None singleton.
var HEBREW-
The type of the None singleton.
var HINDI-
The type of the None singleton.
var HUNGARIAN-
The type of the None singleton.
var INDONESIAN-
The type of the None singleton.
var ITALIAN-
The type of the None singleton.
var JAPANESE-
The type of the None singleton.
var KOREAN-
The type of the None singleton.
var MALAY-
The type of the None singleton.
var NONE-
The type of the None singleton.
var PERSIAN-
The type of the None singleton.
var POLISH-
The type of the None singleton.
var PORTUGUESE-
The type of the None singleton.
var ROMANIAN-
The type of the None singleton.
var RUSSIAN-
The type of the None singleton.
var SPANISH-
The type of the None singleton.
var SWEDISH-
The type of the None singleton.
var TAGALOG-
The type of the None singleton.
var TAMIL-
The type of the None singleton.
var TELUGU-
The type of the None singleton.
var THAI-
The type of the None singleton.
var TURKISH-
The type of the None singleton.
var UKRAINIAN-
The type of the None singleton.
var VIETNAMESE-
The type of the None singleton.
class TranscriptParams (...)-
init(self: rtms._rtms.TranscriptParams) -> None
Ancestors
- pybind11_builtins.pybind11_object
Instance variables
prop contentTypeprop content_typeprop enableLidprop enable_lidprop srcLanguageprop src_language
class VideoCodec (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var H264-
The type of the None singleton.
var JPG-
The type of the None singleton.
var PNG-
The type of the None singleton.
var UNDEFINED-
The type of the None singleton.
class VideoContentType (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var FILE_STREAM-
The type of the None singleton.
var RAW_VIDEO-
The type of the None singleton.
var RTP-
The type of the None singleton.
var TEXT-
The type of the None singleton.
var UNDEFINED-
The type of the None singleton.
class VideoParams (...)-
init(args, *kwargs) Overloaded function.
-
init(self: rtms._rtms.VideoParams) -> None
-
init(self: rtms._rtms.VideoParams, content_type: typing.SupportsInt | typing.SupportsIndex, codec: typing.SupportsInt | typing.SupportsIndex, resolution: typing.SupportsInt | typing.SupportsIndex, data_opt: typing.SupportsInt | typing.SupportsIndex, fps: typing.SupportsInt | typing.SupportsIndex) -> None
Ancestors
- pybind11_builtins.pybind11_object
Instance variables
prop codecprop contentTypeprop content_typeprop dataOptprop data_optprop fpsprop resolution
-
class VideoResolution (*args, **kwds)-
Enum where members are also (and must be) ints
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var FHD-
The type of the None singleton.
var HD-
The type of the None singleton.
var QHD-
The type of the None singleton.
var SD-
The type of the None singleton.