Skip to main content

Python3 SDK Signatures

Overview

The Python SDK signatures originate from the C SDK, encapsulated within a C++ wrapper and integrated using SWIG. These functions oversee the entire lifecycle of an xApp in Python, offering a simplified version of their C counterparts. First, we will overview each signature, followed by a detailed exploration of each:

SignatureDescription
init()Initializes the xApp and sends an E42-Setup-Request to NearRT-RIC based on the given configurations
get_cust_sm_conf()Get customized service model configurations (e.g., service model name, subscription period) read from the xApp configuration file
get_oran_sm_conf)Get O-RAN standarized service model configurations (e.g., service model name, subscription period, and action definitions) read from the xApp configuration file
conn_e2_nodes()Get connected E2-Nodes' information (e.g., NodeB ID, RAN type and supported RAN functions, etc)
report_{SM-NAME}_sm()Sends a RIC-Subscription-Request to subscribe the RAN function on the specific E2-Node via the service model
{SM-NAME}Callback()A class callback of service model(s) for handling indication messages received by the xApp
control_{SM-NAME}_sm()Sends a RIC-Conctrol-Request to the RAN function at the specific E2-Node via the service model
rm_report_{SM-NAME}_sm()Sends a RIC-Delete-Subscription-Request to unsubscribes the subscribed RAN functions
try_stop()Terminates the xApp

Note: {SM-NAME} serves as a placeholder for the name of a specific service model, for example: if {SM-NAME} = kpm which stands for Key Performance Measurement service model, the function names will be report_kpm_sm, sm_kpm_rc, KPMCallback() and so on. The supported {SM-NAME}s in Python SDK include:

  • Customized service models: mac, rlc, pdcp, gtp, slice
  • O-RAN standarized service models: kpm

Sequence Diagram

The following figures illustrates the sequence diagram of lifecycle in a Python xApp, including:

  1. Init: initialize xApp (from step 2 to 11)
  2. Report Service: subscribe RAN function via service model (from step 12 to 20)
  3. xApp Logic: store or process receiving data from RAN functions (from step 21 to 23)
  4. Control Service: control RAN function via service model (from step 24 to 30)
  5. Exit: termiate xApp (from step 31 to 41)

This helps us understand what is the role of each signature and when it should be used.

Detail of Signatures


Init - Step 2 to 11


init()

  • Example:

    # import Python SDK
    import xapp_sdk as ric

    # Call init function and give command-line arguments as input
    ric.init(sys.argv)
  • Input:

    ParameterType in PythonType in Swig Wrapper C++Comment
    sys.argvlistvector<string>& argvThis parameter captures all the command-line arguments, including any options and their respective values
  • Output: N/A


get_cust_sm_conf()

  • Example:

    # Get customized SMs' configurations
    cust_sm = ric.get_cust_sm_conf()

    # Loop cust_sm to get each SM confiugration object
    for sm_info in cust_sm:
    # Get name
    sm_name = sm_info.name

    # Get time
    sm_time = sm_info.time
    # Convert type of time from string to enum defined in swig wrapper for subscription function
    tti = get_cust_tti(sm_time)
  • Input: N/A

  • Output:

    ParameterType in PythonType in Swig Wrapper C++Comment
    cust_smtuplevector<swig_sub_cust_sm_t>This parameter contains the configured customized service models' information read from Sub_CUST_SM_List in xApp configuration file (e.g., xapp.conf)

get_oran_sm_conf()

  • Example:

    # Get O-RAN standarized SMs' configurations
    oran_sm = ric.get_oran_sm_conf()

    # Loop oran_sm to get each SM confiugration object
    for sm_info in cust_sm:
    # Get name
    sm_name = sm_info.name

    # Get time
    sm_time = sm_info.time
    # Convert type of time from int to enum defined in swig wrapper for subscription function
    tti = get_oran_tti(sm_time)

    # Get format of action definition
    sm_format = sm_info.format

    # Get RAN type
    ran_type = sm_info.ran_type

    # Get lenght of action definition
    act_len = sm_info.act_len

    # Get configured measurment names and store them in a list to create action definition in SWIG level
    act = []
    for act_name in sm_info.actions:
    act.append(act_name)
  • Input: N/A

  • Output:

    ParameterType in PythonType in Swig Wrapper C++Comment
    oran_smtuplevector<swig_sub_oran_sm_t>This parameter contains the configured O-RAN standarized service models' information read from Sub_ORAN_SM_List in xApp configuration file

conn_e2_nodes()

  • Example:

    # Get connected E2-Nodes
    conn = ric.conn_e2_nodes()

    # Case 1: if no E2-Node connected, do no excuting xApp anymore
    assert(len(conn) > 0)

    # Case 2: if the number of connected E2-Nodes is greater than 0
    # Loop conn to get each connected E2-Node' information
    for i in range(0, len(conn)):
    # Get NodeB ID
    nb_id = conn[i].id.nb_id.nb_id

    # Get PLMN
    plmn_mcc = conn[i].id.plmn.mcc
    plmn_mnc = conn[i].id.plmn.mnc

    # Get RAN type
    e2node_type = conn[i].id.type

    # Check the configured RAN type is the same as the connceted E2-Node's RAN type
    # Note: configured_ran_type is read from the xApp configuration file
    if configured_ran_type == ric.get_e2ap_ngran_name(e2node_type):
    # do somthing, i.e., send subscription request

  • Input: N/A

  • Output:

    ParameterType in PythonType in Swig Wrapper C++Comment
    conntuplevector<E2Node>This parameter contains the connected E2-Nodes' information, including globle E2-Node ID and RAN function definitions

Report Service - Step 12 to 20


report_{SM-NAME}_sm()

  • Example - Customized SM: {SM-NAME} = mac

    # Define a handler to save the RIC Request ID
    mac_hndlr = []

    # Subscribe MAC RAN functions on all the connected E2-Nodes
    for i in range(0, len(conn)):
    # define a callback for MAC SM
    mac_cb = MACCallback()

    # Send subscription request to conn[i] by giving:
    # global E2-Node ID, transmission time intervale of indicaiton message, and defined callback function
    hndlr = ric.report_mac_sm(conn[i].id, tti, mac_cb)

    # Append the RIC Request ID
    mac_hndlr.append(hndlr)
  • Input:

    ParameterType in PythonType in Swig Wrapper C++Comment
    conn[i].idxapp_sdk.swig_global_e2_node_id_tswig_global_e2_node_id_t*Stands for Global E2-Node ID
    ttiintIntervalTime duration of each RIC Indication read from xapp.conf
    mac_cbclassmac_cb*Defined callback function in xApp
  • Output:

    ParameterType in PythonType in Swig Wrapper C++Comment
    hndlrintintRIC Request ID
  • Example - O-RAN Standarized SM: {SM-NAME} = kpm

    # Define a handler to save the RIC Request ID
    kpm_hndlr = []

    # Subscribe KPM RAN functions on all the connected E2-Nodes
    for i in range(0, len(conn)):
    # define a callback for KPM SM
    kpm_cb = KPMCallback()

    # Send subscription request to conn[i] by giving:
    # global E2-Node ID, transmission time intervale of indicaiton message,
    # list of measurment names, and defined callback function
    hndlr = ric.report_kpm_sm(conn[i].id, tti, act, kpm_cb)

    # Append the RIC Request ID
    kpm_hndlr.append(hndlr)
  • Input:

    ParameterType in PythonType in Swig Wrapper C++Comment
    conn[i].idxapp_sdk.swig_global_e2_node_id_tswig_global_e2_node_id_t*Stands for Global E2-Node ID
    ttiintIntervalTime duration of each RIC Indication read from xapp.conf
    actlistvector<string>& action*List of measurement names read from xapp.conf
    kpm_cbclasskpm_cb*Defined callback function in xApp
  • Output:

    ParameterType in PythonType in Swig Wrapper C++Comment
    hndlrintintRIC Request ID

xApp Logic - Step 21 to 23


{SM-NAME}Callback()

  • Example - Customized SM: {SM-NAME} = mac

    # MACCallback class is defined and derived from C++ class mac_cb
    class MACCallback(ric.mac_cb):
    # Define Python class 'constructor'
    def __init__(self):
    ric.mac_cb.__init__(self)

    # Override C++ method: virtual void handle(swig_mac_ind_msg_t a) = 0;
    def handle(self, ind):
    # Print value stored in swig_mac_ind_msg_t
    if len(ind.ue_stats) > 0:
    t_now = time.time_ns() / 1000.0
    t_mac = ind.tstamp / 1.0
    t_diff = t_now - t_mac
    print(f"MAC Indication tstamp {t_now} diff {t_diff} E2-node type {ind.id.type} nb_id {ind.id.nb_id.nb_id}")
  • Input of MACCallback():

    ParameterType in PythonType in Swig Wrapper C++Comment
    ric.mac_cb__main__.MACCallbackstructDefined struct in C++ includes a virtual function handle and a virtual destructor
  • Input of handle():

    ParameterType in PythonType in Swig Wrapper C++Comment
    indxapp_sdk.swig_mac_ind_msg_tswig_mac_ind_msg_t*A RIC Indication message storing data in swig_mac_ind_msg_t

Note: Once the RIC Subscription Request is received by the E2-Node, the xApp will continue to receive RIC Indications forwarded from the C SDK to the Python SDK via handle(). For further development (i.e., ML or AI usage), developers can process the received RIC Indications to analyze/optimize network performance.


Control Service - Step 24 to 30


control_{SM-NAME}_sm()

  • Example - Customized SM: {SM-NAME} = slice

    # Create slice control message by following the structure slice_ctrl_msg_t
    msg = fill_slice_ctrl_msg()

    # Send control request to conn[i] by giving:
    # global E2-Node ID and predefined contorl message
    ric.control_slice_sm(conn[i].id, msg)
  • Input:

    ParameterType in PythonType in Swig Wrapper C++Comment
    conn[i].idxapp_sdk.swig_global_e2_node_id_tswig_global_e2_node_id_t*Stands for Global E2-Node ID
    msgxapp_sdk.slice_ctrl_msg_tslice_ctrl_msg_t*Control message written by xApp
  • Output: N/A


Exit - Step 31 -41


rm_report_{SM-NAME]_sm()

  • Example - Customized SM: {SM-NAME} = mac

    # Loop list of handler to send subcription delete request to each recorded RIC Request ID
    for i in range(0, len(mac_hndlr)):
    ric.rm_report_mac_sm(mac_hndlr[i])
  • Input:

    ParameterType in PythonType in Swig Wrapper C++Comment
    mac_hndlr[i]intintRIC Request ID
  • Output: N/A


try_stop()

  • Example - Customized SM: {SM-NAME} = mac

    # Call the stop function every one second until it recevies a true value
    ans = ric.try_stop()
    while ans == 0:
    time.sleep(1)
    ans = ric.try_stop()
  • Input: N/A

  • Output:

    ParameterType in PythonType in Swig Wrapper C++Comment
    ansboolboolThe status of terminating an xApp: "TRUE" means the xApp has terminated completely, including freeing the memory; "FALSE" indicates the opposite.