Python3 SDK Signatures
Overview
The Python SDK signatures originate from the C SDK, encapsulated within a C++ wrapper and integrated using SWIG. These functions oversee the entire lifecycle of an xApp in Python, offering a simplified version of their C counterparts. First, we will overview each signature, followed by a detailed exploration of each:
Signature | Description |
---|---|
init() | Initializes the xApp and sends an E42-Setup-Request to NearRT-RIC based on the given configurations |
get_cust_sm_conf() | Get customized service model configurations (e.g., service model name, subscription period) read from the xApp configuration file |
get_oran_sm_conf) | Get O-RAN standarized service model configurations (e.g., service model name, subscription period, and action definitions) read from the xApp configuration file |
conn_e2_nodes() | Get connected E2-Nodes' information (e.g., NodeB ID, RAN type and supported RAN functions, etc) |
report_{SM-NAME}_sm() | Sends a RIC-Subscription-Request to subscribe the RAN function on the specific E2-Node via the service model |
{SM-NAME}Callback() | A class callback of service model(s) for handling indication messages received by the xApp |
control_{SM-NAME}_sm() | Sends a RIC-Conctrol-Request to the RAN function at the specific E2-Node via the service model |
rm_report_{SM-NAME}_sm() | Sends a RIC-Delete-Subscription-Request to unsubscribes the subscribed RAN functions |
try_stop() | Terminates the xApp |
Note:
{SM-NAME}
serves as a placeholder for the name of a specific service model, for example: if{SM-NAME} = kpm
which stands for Key Performance Measurement service model, the function names will bereport_kpm_sm
,sm_kpm_rc
,KPMCallback()
and so on. The supported{SM-NAME}
s in Python SDK include:
- Customized service models: mac, rlc, pdcp, gtp, slice
- O-RAN standarized service models: kpm
Sequence Diagram
The following figures illustrates the sequence diagram of lifecycle in a Python xApp, including:
- Init: initialize xApp (from step 2 to 11)
- Report Service: subscribe RAN function via service model (from step 12 to 20)
- xApp Logic: store or process receiving data from RAN functions (from step 21 to 23)
- Control Service: control RAN function via service model (from step 24 to 30)
- Exit: termiate xApp (from step 31 to 41)
This helps us understand what is the role of each signature and when it should be used.
Detail of Signatures
Init - Step 2 to 11
- Used functions
init()
-
Example:
# import Python SDK
import xapp_sdk as ric
# Call init function and give command-line arguments as input
ric.init(sys.argv) -
Input:
Parameter Type in Python Type in Swig Wrapper C++ Comment sys.argv
list
vector<string>& argv
This parameter captures all the command-line arguments, including any options and their respective values -
Output: N/A
get_cust_sm_conf()
-
Example:
# Get customized SMs' configurations
cust_sm = ric.get_cust_sm_conf()
# Loop cust_sm to get each SM confiugration object
for sm_info in cust_sm:
# Get name
sm_name = sm_info.name
# Get time
sm_time = sm_info.time
# Convert type of time from string to enum defined in swig wrapper for subscription function
tti = get_cust_tti(sm_time) -
Input: N/A
-
Output:
Parameter Type in Python Type in Swig Wrapper C++ Comment cust_sm
tuple
vector<swig_sub_cust_sm_t>
This parameter contains the configured customized service models' information read from Sub_CUST_SM_List
in xApp configuration file (e.g., xapp.conf)
get_oran_sm_conf()
-
Example:
# Get O-RAN standarized SMs' configurations
oran_sm = ric.get_oran_sm_conf()
# Loop oran_sm to get each SM confiugration object
for sm_info in cust_sm:
# Get name
sm_name = sm_info.name
# Get time
sm_time = sm_info.time
# Convert type of time from int to enum defined in swig wrapper for subscription function
tti = get_oran_tti(sm_time)
# Get format of action definition
sm_format = sm_info.format
# Get RAN type
ran_type = sm_info.ran_type
# Get lenght of action definition
act_len = sm_info.act_len
# Get configured measurment names and store them in a list to create action definition in SWIG level
act = []
for act_name in sm_info.actions:
act.append(act_name) -
Input: N/A
-
Output:
Parameter Type in Python Type in Swig Wrapper C++ Comment oran_sm
tuple
vector<swig_sub_oran_sm_t>
This parameter contains the configured O-RAN standarized service models' information read from Sub_ORAN_SM_List
in xApp configuration file
conn_e2_nodes()
-
Example:
# Get connected E2-Nodes
conn = ric.conn_e2_nodes()
# Case 1: if no E2-Node connected, do no excuting xApp anymore
assert(len(conn) > 0)
# Case 2: if the number of connected E2-Nodes is greater than 0
# Loop conn to get each connected E2-Node' information
for i in range(0, len(conn)):
# Get NodeB ID
nb_id = conn[i].id.nb_id.nb_id
# Get PLMN
plmn_mcc = conn[i].id.plmn.mcc
plmn_mnc = conn[i].id.plmn.mnc
# Get RAN type
e2node_type = conn[i].id.type
# Check the configured RAN type is the same as the connceted E2-Node's RAN type
# Note: configured_ran_type is read from the xApp configuration file
if configured_ran_type == ric.get_e2ap_ngran_name(e2node_type):
# do somthing, i.e., send subscription request
-
Input: N/A
-
Output:
Parameter Type in Python Type in Swig Wrapper C++ Comment conn
tuple
vector<E2Node>
This parameter contains the connected E2-Nodes' information, including globle E2-Node ID and RAN function definitions
Report Service - Step 12 to 20
- Used functions
report_{SM-NAME}_sm()
-
Example - Customized SM:
{SM-NAME} = mac
# Define a handler to save the RIC Request ID
mac_hndlr = []
# Subscribe MAC RAN functions on all the connected E2-Nodes
for i in range(0, len(conn)):
# define a callback for MAC SM
mac_cb = MACCallback()
# Send subscription request to conn[i] by giving:
# global E2-Node ID, transmission time intervale of indicaiton message, and defined callback function
hndlr = ric.report_mac_sm(conn[i].id, tti, mac_cb)
# Append the RIC Request ID
mac_hndlr.append(hndlr) -
Input:
Parameter Type in Python Type in Swig Wrapper C++ Comment conn[i].id
xapp_sdk.swig_global_e2_node_id_t
swig_global_e2_node_id_t*
Stands for Global E2-Node ID tti
int
Interval
Time duration of each RIC Indication read from xapp.conf mac_cb
class
mac_cb*
Defined callback function in xApp -
Output:
Parameter Type in Python Type in Swig Wrapper C++ Comment hndlr
int
int
RIC Request ID -
Example - O-RAN Standarized SM:
{SM-NAME} = kpm
# Define a handler to save the RIC Request ID
kpm_hndlr = []
# Subscribe KPM RAN functions on all the connected E2-Nodes
for i in range(0, len(conn)):
# define a callback for KPM SM
kpm_cb = KPMCallback()
# Send subscription request to conn[i] by giving:
# global E2-Node ID, transmission time intervale of indicaiton message,
# list of measurment names, and defined callback function
hndlr = ric.report_kpm_sm(conn[i].id, tti, act, kpm_cb)
# Append the RIC Request ID
kpm_hndlr.append(hndlr) -
Input:
Parameter Type in Python Type in Swig Wrapper C++ Comment conn[i].id
xapp_sdk.swig_global_e2_node_id_t
swig_global_e2_node_id_t*
Stands for Global E2-Node ID tti
int
Interval
Time duration of each RIC Indication read from xapp.conf act
list
vector<string>& action*
List of measurement names read from xapp.conf kpm_cb
class
kpm_cb*
Defined callback function in xApp -
Output:
Parameter Type in Python Type in Swig Wrapper C++ Comment hndlr
int
int
RIC Request ID
xApp Logic - Step 21 to 23
- Used functions
{SM-NAME}Callback()
-
Example - Customized SM:
{SM-NAME} = mac
# MACCallback class is defined and derived from C++ class mac_cb
class MACCallback(ric.mac_cb):
# Define Python class 'constructor'
def __init__(self):
ric.mac_cb.__init__(self)
# Override C++ method: virtual void handle(swig_mac_ind_msg_t a) = 0;
def handle(self, ind):
# Print value stored in swig_mac_ind_msg_t
if len(ind.ue_stats) > 0:
t_now = time.time_ns() / 1000.0
t_mac = ind.tstamp / 1.0
t_diff = t_now - t_mac
print(f"MAC Indication tstamp {t_now} diff {t_diff} E2-node type {ind.id.type} nb_id {ind.id.nb_id.nb_id}") -
Input of
MACCallback()
:Parameter Type in Python Type in Swig Wrapper C++ Comment ric.mac_cb
__main__.MACCallback
struct
Defined struct in C++ includes a virtual function handle
and a virtual destructor -
Input of
handle()
:Parameter Type in Python Type in Swig Wrapper C++ Comment ind
xapp_sdk.swig_mac_ind_msg_t
swig_mac_ind_msg_t*
A RIC Indication message storing data in swig_mac_ind_msg_t
Note: Once the RIC Subscription Request is received by the E2-Node, the xApp will continue to receive RIC Indications forwarded from the C SDK to the Python SDK via
handle()
. For further development (i.e., ML or AI usage), developers can process the received RIC Indications to analyze/optimize network performance.
Control Service - Step 24 to 30
- Used functions
control_{SM-NAME}_sm()
-
Example - Customized SM:
{SM-NAME} = slice
# Create slice control message by following the structure slice_ctrl_msg_t
msg = fill_slice_ctrl_msg()
# Send control request to conn[i] by giving:
# global E2-Node ID and predefined contorl message
ric.control_slice_sm(conn[i].id, msg) -
Input:
Parameter Type in Python Type in Swig Wrapper C++ Comment conn[i].id
xapp_sdk.swig_global_e2_node_id_t
swig_global_e2_node_id_t*
Stands for Global E2-Node ID msg
xapp_sdk.slice_ctrl_msg_t
slice_ctrl_msg_t*
Control message written by xApp -
Output: N/A
Exit - Step 31 -41
- Used functions
rm_report_{SM-NAME]_sm()
-
Example - Customized SM:
{SM-NAME} = mac
# Loop list of handler to send subcription delete request to each recorded RIC Request ID
for i in range(0, len(mac_hndlr)):
ric.rm_report_mac_sm(mac_hndlr[i]) -
Input:
Parameter Type in Python Type in Swig Wrapper C++ Comment mac_hndlr[i]
int
int
RIC Request ID -
Output: N/A
try_stop()
-
Example - Customized SM:
{SM-NAME} = mac
# Call the stop function every one second until it recevies a true value
ans = ric.try_stop()
while ans == 0:
time.sleep(1)
ans = ric.try_stop() -
Input: N/A
-
Output:
Parameter Type in Python Type in Swig Wrapper C++ Comment ans
bool
bool
The status of terminating an xApp: "TRUE" means the xApp has terminated completely, including freeing the memory; "FALSE" indicates the opposite.