Telecommunications engineering is the discipline where a single mishandled timer expiry in an RRC state machine can drop ten thousand active calls, where a SIP parser that misreads a Via header routes voice traffic into a black hole, and where the protocol specifications you implement against — the 3GPP TS 38.xxx series alone runs to tens of thousands of pages — are updated every quarterly release cycle. The global telecommunications industry generates over $1.7 trillion in annual revenue, and the engineers who build, maintain, and evolve this infrastructure work at the intersection of real-time systems programming, formal protocol specification, distributed systems architecture, and regulatory compliance that spans every jurisdiction on earth. You are not building a web application that can tolerate a 500ms hiccup — you are building the infrastructure that web applications run on, where a 1ms latency violation in a URLLC bearer breaks the contract with an autonomous vehicle relying on that link, where a missed HARQ retransmission in the MAC layer cascades into an RLC reassembly failure that corrupts a PDCP SDU carrying a VoNR voice frame, and where “five nines” availability (99.999%) means you are allowed 5.26 minutes of downtime per year, not per month.
This guide evaluates every major AI coding tool through the lens of what telecommunications engineers actually build. We tested each tool against seven core task areas: 5G NR protocol stack implementation (RRC state machines, MAC scheduling, RLC segmentation, PDCP ciphering, SDAP QoS mapping, ASN.1 PER encoding), SIP/VoIP and IMS integration (SIP message parsing, SDP offer/answer, dialog state machines, Diameter interfaces, RTP/RTCP media handling, VoLTE/VoNR), NFV/CNF orchestration (MANO lifecycle management, cloud-native network functions on Kubernetes, 5G Service Based Architecture, ETSI NFV standards), protocol state machine design (formal correctness, timer management, race condition handling, guard conditions), real-time signaling systems (HARQ timing, GTP-U tunnel management, SCTP multi-homing, PFCP session control, URLLC latency budgets), 3GPP conformance and testing (TTCN-3 test suites, conformance test cases, protocol trace analysis), and network telemetry and observability (SNMP, NETCONF/YANG, gNMI streaming, KPI computation, alarm correlation). Every code example is production-realistic — real 3GPP information element names, real timer identifiers, real protocol message structures.
If your work focuses more on general network infrastructure and routing, see our Networking Engineers guide. If you build firmware for radio hardware or IoT devices on the network edge, see the Embedded/IoT Engineers guide. If your primary concern is latency profiling and throughput optimization at the system level, see the Performance Engineers guide.
Best free ($0): GitHub Copilot Free — decent protocol scaffolding and boilerplate for message structures, 2,000 completions/mo covers light protocol development. Best overall ($20/mo): Cursor Pro — multi-file context handles protocol stack layers, state machines, and configuration together across your project tree. Best for reasoning ($20/mo): Claude Code — strongest at 3GPP specification interpretation, protocol state machine correctness verification, and signaling flow analysis across layers. Best combo ($30/mo): Claude Code + Copilot Pro — Claude for spec reasoning and protocol correctness, Copilot for fast inline completions during routine coding. Budget ($0): Copilot Free + Gemini CLI Free.
Why Telecommunications Engineering Is Different
Telecommunications engineers evaluate AI tools on a fundamentally different axis than application developers. A web developer asks “does this tool write good React?” A telecom engineer asks “does this tool understand that the T310 timer expiry in RRC_CONNECTED triggers a transition to RRC_IDLE only after T311 has also expired without a suitable cell being found, and that getting this sequence wrong means the UE drops an active VoNR call instead of performing cell reselection?” The evaluation criteria are unique to this domain:
- 5G NR protocol stacks are layered complexity that must interlock precisely. The 3GPP TS 38.xxx specification series defines the New Radio access technology across multiple protocol layers, each with its own state machines, timers, and data processing requirements. The RRC (Radio Resource Control) layer, specified in TS 38.331, manages the connection between UE and gNB through three primary states: RRC_IDLE (no active connection, cell reselection based on SIB information), RRC_INACTIVE (connection context preserved at gNB, UE performs RNA-based mobility), and RRC_CONNECTED (active data transfer, handover managed by gNB). Each state transition involves specific message exchanges — RRCSetupRequest, RRCSetup, RRCSetupComplete for initial access; RRCReconfiguration for bearer modification, handover, and measurement configuration; RRCRelease for connection teardown with optional suspend indication for transition to RRC_INACTIVE. Every message is encoded using ASN.1 PER (Packed Encoding Rules) as specified in TS 38.331 Annex A, where a single bit-offset error in encoding an RRC Information Element produces a message the peer entity cannot decode, triggering an integrity check failure and connection release. Below RRC, the PDCP layer (TS 38.323) handles header compression using ROHC profiles (RFC 5795), integrity protection using 128-bit keys with NIA algorithms, ciphering using NEA algorithms, reordering of out-of-sequence SDUs using a reordering timer (t-Reordering), and duplicate detection using a sliding receive window. The RLC layer (TS 38.322) operates in three modes: Transparent Mode (TM) for broadcast, Unacknowledged Mode (UM) for voice and real-time data where retransmission is worse than loss, and Acknowledged Mode (AM) where ARQ retransmissions with configurable poll timers (t-PollRetransmit) and status triggers ensure reliable delivery. The MAC layer (TS 38.321) handles scheduling through DCI (Downlink Control Information) formats, HARQ process management with up to 16 processes in NR, BSR (Buffer Status Reports) for uplink scheduling requests, PHR (Power Headroom Reports), bandwidth part (BWP) switching, and logical channel prioritization. A change in MAC scheduling parameters affects RLC segmentation sizes, which affects PDCP reordering behavior, which affects RRC measurement reporting timing — and an AI tool that generates code for one layer without understanding its impact on adjacent layers produces a protocol stack that fails under load.
- SIP/VoIP and IMS are stateful protocol nightmares. The Session Initiation Protocol (RFC 3261) is a text-based signaling protocol that appears simple until you encounter the real-world complexity of a production IMS (IP Multimedia Subsystem) deployment. A basic call setup requires an INVITE request flowing through P-CSCF (Proxy-Call Session Control Function), I-CSCF (Interrogating-CSCF for HSS lookup), and S-CSCF (Serving-CSCF for service execution), each adding Via headers, Record-Route headers, and P-headers for charging and access network information. The SDP (Session Description Protocol, RFC 4566) body in the INVITE carries the offer with media descriptions — codec capabilities (AMR-WB for VoLTE, EVS for VoNR, with specific mode-set parameters), RTP port numbers, ICE candidates for NAT traversal, DTLS-SRTP fingerprints for media encryption, and bandwidth modifiers. The answer in the 200 OK must intersect the offered codecs, select compatible parameters, and include its own transport addresses. Dialog state management tracks early dialogs (after provisional 1xx responses with To-tag), confirmed dialogs (after 2xx), and terminated dialogs (after BYE or error responses) — and a single dialog can fork into multiple early dialogs when an INVITE reaches multiple endpoints through a forking proxy. Transaction state machines (RFC 3261 Section 17) manage retransmissions for unreliable transports (UDP), with Timer A for INVITE retransmit intervals, Timer B for INVITE transaction timeout (64*T1 = 32 seconds), Timer D for wait time after non-INVITE response (32 seconds for UDP), and Timer F for non-INVITE transaction timeout. The Diameter protocol interfaces in IMS add another layer: Cx (HSS to I-CSCF/S-CSCF for registration and routing), Rx (P-CSCF to PCRF for policy and charging rules), Gx (PCRF to PCEF for policy enforcement), Gy (OCS for online charging), and Sh (AS to HSS for subscriber data). Each Diameter interface has its own AVP (Attribute-Value Pair) dictionary, command codes, and state machines. An AI tool that generates “SIP handling code” using stateless request-response patterns like HTTP produces a VoIP system that drops calls on network packet loss, fails on forked INVITE responses, and cannot maintain dialog state across re-INVITE for hold/resume or codec renegotiation.
- Network Function Virtualization transforms telecom infrastructure. The telecom industry’s migration from purpose-built hardware appliances to virtualized and cloud-native network functions represents the largest infrastructure transformation in its history. ETSI NFV defines the MANO (Management and Orchestration) architecture: NFVO (NFV Orchestrator) manages network service lifecycle and resource orchestration across VIMs, VNFM (VNF Manager) handles individual VNF lifecycle operations (instantiation, scaling, healing, termination), and VIM (Virtualized Infrastructure Manager, typically OpenStack or Kubernetes) manages compute, storage, and networking resources. The 5G Core (5GC) takes this further with the Service Based Architecture (SBA) defined in TS 23.501, where network functions communicate via HTTP/2-based service interfaces: AMF (Access and Mobility Management Function) handles registration, connection, and mobility management; SMF (Session Management Function) manages PDU sessions and UPF selection; UPF (User Plane Function) processes and forwards user data packets; PCF (Policy Control Function) provides policy rules; UDM (Unified Data Management) stores subscriber data; AUSF (Authentication Server Function) handles authentication; NRF (Network Repository Function) provides service discovery; and NSSF (Network Slice Selection Function) selects network slices. Each NF must support service registration with NRF, OAuth2-based authorization for inter-NF communication (TS 33.501), graceful scaling (draining existing sessions before removing instances), heartbeat-based health monitoring, and the N32 interface for inter-PLMN security. Cloud-native deployments add Kubernetes-specific concerns: StatefulSets for NFs with persistent state (SMF session state, UDM subscriber data), custom operators for lifecycle management, Multus for multiple CNI attachments (separate interfaces for signaling, user plane, and management), and DPDK/SR-IOV for user plane data path performance. An AI tool that treats network function deployment like a standard web microservice — stateless, horizontally scalable, restart-anywhere — produces deployments that lose signaling state on pod restart, break mid-call during scaling events, and cannot meet the 99.999% availability SLA that telecom operators contractually guarantee.
- Protocol state machines must be formally correct. Telecommunications protocols are defined as state machines, and the correctness of those state machines is not a quality-of-service concern — it is a functional requirement. The UE registration procedure (TS 24.501 Section 5.5.1) involves states from DEREGISTERED through REGISTRATION-INITIATED to REGISTERED, with sub-states for normal service, limited service, attempting registration update, and PLMN search. Each transition has preconditions (guard conditions), actions (send message, start timer, update context), and post-conditions. Timer T3510 guards the Registration Request: if no response arrives within T3510 expiry, the UE retransmits up to four times (configurable by the network), and if all attempts fail, enters a backoff state governed by T3511 or T3502 depending on the cause value in the rejection. The PDU Session Establishment procedure (TS 24.501 Section 6.4.1) adds SM states (PDU_SESSION_INACTIVE, PDU_SESSION_ACTIVE, PDU_SESSION_MODIFICATION_PENDING) with their own timer set (T3580, T3581, T3582). Handover state machines are particularly complex: intra-gNB handover, inter-gNB Xn handover (TS 38.423), inter-gNB N2 handover via AMF (TS 23.502 Section 4.9.1), and inter-RAT handover to LTE (TS 23.502 Section 4.11) each follow different message sequences, involve different network entities, and have different failure recovery procedures. Race conditions arise when simultaneous events hit the state machine: what happens when a Handover Command arrives while a Measurement Report is being transmitted? What if an RRC Connection Release arrives during an ongoing Security Mode procedure? Every race condition must be specified and handled. An AI tool that generates state machine code with “TODO: handle edge cases” comments instead of explicit handling for every concurrent event is generating a protocol implementation that will fail in the field — not during lab testing with a single UE, but when thousands of UEs hit rare-but-inevitable timing coincidences.
- Real-time signaling has sub-millisecond constraints. Telecom signaling operates under latency budgets that make web application response times look glacial. In 5G NR, the HARQ (Hybrid Automatic Repeat Request) feedback timing is determined by the K1 value in DCI format 1_0/1_1, which specifies the slot offset between PDSCH reception and HARQ-ACK transmission — typically 1-8 slots, where each slot at 30 kHz subcarrier spacing is 0.5ms, meaning the UE must decode a transport block, check the CRC, and prepare ACK/NACK feedback in under 0.5ms in the tightest configuration. On the network side, the SCTP (Stream Control Transmission Protocol, RFC 4960) transport for signaling interfaces (S1-AP/NG-AP between gNB and core, Diameter between core NFs) requires multi-homing management — maintaining primary and backup paths, detecting path failures via heartbeat chunks, and performing failover without losing in-flight signaling messages. GTP-U (GPRS Tunneling Protocol User Plane, TS 29.281) tunnel management on the UPF must process millions of packets per second, each requiring TEID (Tunnel Endpoint Identifier) lookup, QoS marking based on QFI (QoS Flow Identifier), usage metering for charging, and lawful intercept duplication — all at line rate on 100Gbps interfaces. PFCP (Packet Forwarding Control Protocol, TS 29.244) sessions between SMF and UPF must be established within the time budget of a PDU Session Establishment procedure (typically under 1 second E2E including UE-gNB-AMF-SMF signaling). URLLC (Ultra-Reliable Low-Latency Communication) bearers target 1ms one-way user plane latency with 99.999% reliability, requiring pre-configured scheduling grants, mini-slot transmission, and configured grant Type 1/Type 2 mechanisms. An AI tool that generates signaling code using synchronous I/O, garbage-collected languages without deterministic latency guarantees, or standard TCP sockets where SCTP is required produces systems that fail timing requirements under load — and in telecom, a timing failure is a service failure.
- Regulatory and standards compliance is non-negotiable. Telecommunications is one of the most heavily regulated industries on earth. In the United States, the FCC regulates spectrum usage under Title 47 CFR Parts 22 (public mobile services), 24 (PCS), 27 (miscellaneous wireless), and 30 (upper microwave for 5G mmWave), with each band having specific technical rules for transmit power, out-of-band emissions, and interference protection. E911 requirements (FCC 07-166) mandate that wireless carriers provide location information for emergency calls — horizontal accuracy within 50 meters for 80% of calls using dispatchable location or x/y coordinates, and vertical accuracy within 3 meters for 80% of calls in buildings where z-axis location is required. The transition to NG-911 (NENA i3 architecture) adds SIP-based call routing, GIS-based location determination, and ESInet/PSAP connectivity requirements. CALEA (Communications Assistance for Law Enforcement Act) requires carriers to implement lawful intercept capabilities: mediation functions that can intercept call-identifying information (pen register) and call content (wiretap) based on court-ordered warrants, delivering intercepted data in standard formats (ATIS/TIA J-STD-025B) to law enforcement agencies. Number portability through the NPAC (Number Portability Administration Center) requires SOA (Service Order Administration) interface integration for porting requests, LRN (Location Routing Number) based call routing, and dip queries to carrier databases. 3GPP conformance testing uses TTCN-3 (Testing and Test Control Notation version 3) test suites defined by ETSI, with specific test cases for every protocol procedure. Each of these regulatory requirements translates directly to code — and non-compliance means FCC fines (up to $500,000 per violation for willful violations), loss of operating licenses, and criminal liability. An AI tool that does not understand the regulatory context behind the code it generates produces technically functional but legally non-compliant systems.
- Network telemetry and observability at scale requires domain-specific tooling. A large mobile operator manages hundreds of thousands of network elements — gNBs, core NFs, transport switches, microwave links — each generating performance counters, alarms, and configuration events. Legacy management uses SNMP (Simple Network Management Protocol) with MIBs (Management Information Bases) that define the counters and configuration parameters available on each element type — and telecom-specific MIBs (like the 3GPP-defined MIBs for eNB/gNB performance counters) run to thousands of OID entries. Modern management uses NETCONF (RFC 6241) with YANG models (RFC 7950) for configuration and state data, where a single network element may implement dozens of YANG modules with hundreds of containers, lists, and leaf nodes. gNMI (gRPC Network Management Interface) provides streaming telemetry with subscribe-once semantics — the network element pushes counter updates at configurable intervals (sample mode) or on change (on-change mode), eliminating polling overhead. The raw counters must be aggregated into KPIs (Key Performance Indicators): CSSR (Call Setup Success Rate) calculated from successful RRC Connection Setup completions divided by RRC Connection Setup attempts, CDR (Call Drop Rate) from abnormal RRC connection releases divided by total active connections, HOSR (Handover Success Rate) from successful handover completions divided by handover attempts, throughput per cell and per UE, latency percentiles, and dozens more. Alarm management follows the X.733 structured alarm model (ITU-T Recommendation) with mandatory fields: managed object class and instance, event type (communications, processing, environment, quality of service, equipment), probable cause (over 100 defined causes like “loss of signal”, “threshold crossed”, “software error”), perceived severity (critical, major, minor, warning, indeterminate, cleared), and additional text. Root cause analysis must correlate alarms across layers — a fiber cut (transport alarm) causes hundreds of cell-level alarms (radio alarms) that should be correlated to a single root cause rather than triggering hundreds of separate investigation tickets. An AI tool generating telecom observability code must understand these domain-specific data models, KPI formulas, and alarm correlation patterns — generic application monitoring tools like Prometheus with default instrumentation miss the domain-specific semantics entirely.
Telecommunications Engineering Task Support Matrix
We tested each tool against seven core telecom engineering tasks. Ratings reflect real-world performance on telecom-specific prompts, not generic coding ability.
| Task | Cursor | Copilot | Claude Code | Windsurf | Tabnine | Amazon Q |
|---|---|---|---|---|---|---|
| 5G NR Protocol Implementation | Strong — multi-file indexing handles RRC/PDCP/RLC/MAC layers together | Moderate — generates message structures but misses cross-layer dependencies | Strong — reasons through 3GPP spec references, understands timer interactions and state transition sequences | Moderate — basic struct definitions, limited protocol semantics | Weak — no telecom protocol knowledge | Basic — minimal 3GPP awareness |
| SIP/VoIP & IMS Integration | Strong — indexes SIP stack files and SDP handling together | Moderate — decent SIP message templates, weak on dialog state machines and Diameter | Strong — understands SIP transaction state machines, SDP negotiation semantics, IMS call flows | Moderate — basic SIP scaffolding, misses RFC nuances | Weak — treats SIP as HTTP-like, no IMS awareness | Basic — Amazon Chime SDK knowledge but not IMS/SIP core |
| NFV/CNF Orchestration | Strong — excellent for Kubernetes operator patterns and multi-file Helm charts | Moderate — good Kubernetes YAML generation, misses telecom-specific lifecycle requirements | Strong — reasons through ETSI NFV lifecycle operations, 5GC SBA interactions | Moderate — standard Kubernetes patterns, no MANO awareness | Basic — generic container patterns | Moderate — good EKS/ECS knowledge, AWS Wavelength awareness for edge |
| Protocol State Machine Design | Moderate — generates state machine structure but misses timer interactions | Basic — simple switch/case FSMs, no understanding of concurrent events or guard conditions | Strong — reasons through state transition sequences, identifies race conditions, generates timer management | Basic — generic FSM patterns without telecom semantics | Weak — no protocol state machine awareness | Basic — AWS Step Functions patterns, not protocol FSMs |
| Real-Time Signaling Systems | Moderate — async patterns available but no telecom timing awareness | Basic — generic async I/O, misses SCTP, GTP-U, PFCP specifics | Strong — understands signaling timing constraints, SCTP multi-homing, GTP-U tunnel semantics | Basic — WebSocket-level real-time, not signaling-level | Weak — no signaling protocol knowledge | Basic — limited to generic event-driven patterns |
| 3GPP Conformance & Testing | Moderate — generates test structures but no TTCN-3 or conformance test awareness | Basic — generic unit test patterns, no protocol test methodology | Strong — understands conformance test structure, can reference 3GPP test case patterns | Basic — standard testing frameworks only | Weak — no telecom testing knowledge | Basic — AWS Device Farm for generic testing, no protocol conformance |
| Network Telemetry & Observability | Strong — indexes YANG models and telemetry configs across project | Moderate — decent SNMP/NETCONF scaffolding, weak on KPI formulas | Strong — understands YANG model structure, KPI computation from raw counters, alarm correlation logic | Moderate — basic monitoring patterns, no telecom KPI knowledge | Basic — generic observability patterns | Moderate — good CloudWatch/Timestream integration, limited YANG/gNMI |
5G NR Protocol Stack Implementation
The 5G NR protocol stack is defined across multiple 3GPP specifications, and implementing it correctly requires understanding not just individual layer behavior but how layers interact under all operating conditions. The RRC layer is the most complex, managing UE state, measurement configuration, bearer setup, handover execution, and system information acquisition. Here is a production-realistic RRC state machine handler that manages the core state transitions, timer management, and measurement reporting:
# 5G NR RRC State Machine Handler
# Reference: 3GPP TS 38.331 v17.4.0
from enum import Enum, auto
from dataclasses import dataclass, field
from typing import Optional, Dict, List, Callable
import time
import struct
import hashlib
class RRCState(Enum):
RRC_IDLE = auto()
RRC_INACTIVE = auto()
RRC_CONNECTED = auto()
class RRCSubState(Enum):
# RRC_IDLE sub-states
IDLE_CAMPED_NORMALLY = auto()
IDLE_ANY_CELL_SELECTION = auto()
IDLE_CAMPED_ON_ANY_CELL = auto()
# RRC_INACTIVE sub-states
INACTIVE_CAMPED_NORMALLY = auto()
INACTIVE_RNA_UPDATE = auto()
# RRC_CONNECTED sub-states
CONNECTED_NORMAL = auto()
CONNECTED_HANDOVER = auto()
CONNECTED_RECONFIG = auto()
@dataclass
class RRCTimer:
"""3GPP-defined RRC timer with name, duration, and callback."""
name: str
duration_ms: int
started_at: Optional[float] = None
max_retries: int = 0
retry_count: int = 0
on_expiry: Optional[Callable] = None
def start(self):
self.started_at = time.monotonic()
def stop(self):
self.started_at = None
self.retry_count = 0
def is_running(self) -> bool:
return self.started_at is not None
def is_expired(self) -> bool:
if self.started_at is None:
return False
elapsed_ms = (time.monotonic() - self.started_at) * 1000
return elapsed_ms >= self.duration_ms
@dataclass
class MeasurementConfig:
"""Measurement configuration from RRCReconfiguration."""
meas_id: int
meas_object_id: int # frequency/carrier to measure
report_config_id: int
# Event-triggered reporting (A1-A6, B1-B2)
event_type: str # "A1", "A2", "A3", "A5", "B1", etc.
threshold_rsrp: Optional[int] = None # dBm
offset_db: float = 0.0
hysteresis_db: float = 0.0
time_to_trigger_ms: int = 0
report_interval_ms: int = 0
max_report_cells: int = 8
@dataclass
class NRRRCController:
"""5G NR RRC layer state machine controller.
Implements TS 38.331 RRC state machine with full timer
management and measurement reporting.
"""
state: RRCState = RRCState.RRC_IDLE
sub_state: RRCSubState = RRCSubState.IDLE_CAMPED_NORMALLY
# UE identity context
c_rnti: Optional[int] = None
i_rnti: Optional[int] = None # for RRC_INACTIVE
resume_mac_i: Optional[bytes] = None
# Security context
kgnb: Optional[bytes] = None
nas_security_activated: bool = False
as_security_activated: bool = False
# Timer bank (TS 38.331 Section 7.1.1)
timers: Dict[str, RRCTimer] = field(default_factory=dict)
# Measurement configuration
meas_configs: Dict[int, MeasurementConfig] = field(default_factory=dict)
# Pending procedures
pending_srb_setup: List[int] = field(default_factory=list)
pending_drb_setup: List[int] = field(default_factory=list)
def __post_init__(self):
self._init_timers()
def _init_timers(self):
"""Initialize 3GPP-defined RRC timers."""
# TS 38.331 Section 7.1.1 timer definitions
self.timers = {
"T300": RRCTimer("T300", 1000, max_retries=4,
on_expiry=self._on_t300_expiry),
"T301": RRCTimer("T301", 1000, max_retries=0,
on_expiry=self._on_t301_expiry),
"T302": RRCTimer("T302", 0), # set by SIB1 barring
"T304": RRCTimer("T304", 100,
on_expiry=self._on_t304_expiry),
"T310": RRCTimer("T310", 1000,
on_expiry=self._on_t310_expiry),
"T311": RRCTimer("T311", 3000,
on_expiry=self._on_t311_expiry),
"T319": RRCTimer("T319", 1000, max_retries=4,
on_expiry=self._on_t319_expiry),
"T320": RRCTimer("T320", 0), # set by RRCRelease
"T325": RRCTimer("T325", 0), # logging duration
"T330": RRCTimer("T330", 0), # DC config
"T331": RRCTimer("T331", 0), # sidelink
}
def handle_rrc_setup_request(self, establishment_cause: int,
ue_identity: bytes):
"""Initiate RRC Connection Setup (TS 38.331 5.3.3).
Args:
establishment_cause: as per TS 38.331 (0=emergency,
1=highPriorityAccess, 2=mt-Access, 3=mo-Signalling,
4=mo-Data, 5=mo-VoiceCall, 6=mo-VideoCall, 7=mo-SMS)
ue_identity: 39-bit random value or I-RNTI for resume
"""
if self.state != RRCState.RRC_IDLE:
raise InvalidStateError(
f"RRCSetupRequest requires RRC_IDLE, "
f"current state: {self.state}")
# Build RRCSetupRequest message (ASN.1 PER encoded)
msg = self._encode_rrc_setup_request(
establishment_cause, ue_identity)
# Start T300 (guards the setup procedure)
self.timers["T300"].start()
# Send on SRB0 (CCCH)
self._send_on_srb(0, msg)
def handle_rrc_setup(self, msg: bytes):
"""Process RRCSetup from gNB (TS 38.331 5.3.3.4).
Configures SRB1, applies radio bearer config, and
transitions toward RRC_CONNECTED.
"""
if not self.timers["T300"].is_running():
# Unexpected RRCSetup outside setup procedure
return
# Stop T300
self.timers["T300"].stop()
# Decode and apply radioBearerConfig
config = self._decode_rrc_setup(msg)
# Establish SRB1
self._setup_srb(1, config.get("srb1_config"))
# Apply MAC/PHY configuration (cell group config)
if "cellGroupConfig" in config:
self._apply_cell_group_config(
config["cellGroupConfig"])
# Assign C-RNTI
self.c_rnti = config.get("c_rnti")
# Transition to RRC_CONNECTED
self._transition_to(RRCState.RRC_CONNECTED,
RRCSubState.CONNECTED_NORMAL)
# Send RRCSetupComplete on SRB1 (carries NAS attach)
complete_msg = self._encode_rrc_setup_complete(
config.get("selected_plmn_identity", 0))
self._send_on_srb(1, complete_msg)
def handle_rrc_reconfiguration(self, msg: bytes):
"""Process RRCReconfiguration (TS 38.331 5.3.5).
Handles bearer setup/modification, measurement config,
handover commands, and SCell addition/release.
"""
if self.state != RRCState.RRC_CONNECTED:
raise InvalidStateError(
"RRCReconfiguration requires RRC_CONNECTED")
config = self._decode_rrc_reconfiguration(msg)
# Check if this is a handover command
if "mobilityControlInfo" in config:
self._handle_handover(config)
return
self.sub_state = RRCSubState.CONNECTED_RECONFIG
# Apply measurement configuration
if "measConfig" in config:
self._apply_meas_config(config["measConfig"])
# Setup/modify DRBs
if "drb_ToAddModList" in config:
for drb in config["drb_ToAddModList"]:
self._setup_drb(drb["drb_Identity"],
drb.get("pdcp_Config"),
drb.get("sdap_Config"))
# Release DRBs
if "drb_ToReleaseList" in config:
for drb_id in config["drb_ToReleaseList"]:
self._release_drb(drb_id)
# Apply SCell configuration
if "sCellToAddModList" in config:
for scell in config["sCellToAddModList"]:
self._configure_scell(scell)
self.sub_state = RRCSubState.CONNECTED_NORMAL
# Send RRCReconfigurationComplete
self._send_on_srb(1,
self._encode_rrc_reconfiguration_complete())
def _handle_handover(self, config: dict):
"""Execute intra-NR handover (TS 38.331 5.3.5.4)."""
self.sub_state = RRCSubState.CONNECTED_HANDOVER
mobility = config["mobilityControlInfo"]
target_pci = mobility["targetPhysCellId"]
new_c_rnti = mobility.get("newUE_Identity")
# Derive new KgNB from current KgNB and NCC
ncc = mobility.get("nextHopChainingCount", 0)
self.kgnb = self._derive_kgnb_star(
self.kgnb, target_pci, ncc)
# Start T304 (handover execution timer)
t304_ms = mobility.get("t304", 100)
self.timers["T304"].duration_ms = t304_ms
self.timers["T304"].start()
# Perform random access on target cell
rach_config = mobility.get("rach_ConfigDedicated")
self._initiate_contention_free_ra(
target_pci, rach_config)
# Apply target cell configuration
if "cellGroupConfig" in config:
self._apply_cell_group_config(
config["cellGroupConfig"])
self.c_rnti = new_c_rnti
def handle_handover_complete(self):
"""Handover completed successfully."""
if self.sub_state != RRCSubState.CONNECTED_HANDOVER:
return
self.timers["T304"].stop()
self.sub_state = RRCSubState.CONNECTED_NORMAL
# Send RRCReconfigurationComplete to target gNB
self._send_on_srb(1,
self._encode_rrc_reconfiguration_complete())
def _on_t300_expiry(self):
"""T300 expired: RRC setup attempt failed."""
timer = self.timers["T300"]
if timer.retry_count < timer.max_retries:
timer.retry_count += 1
timer.start() # Retry
else:
# All attempts exhausted, inform upper layers
self._notify_nas("RRC_SETUP_FAILURE",
cause="T300_EXPIRED")
timer.stop()
def _on_t304_expiry(self):
"""T304 expired: Handover failure (TS 38.331 5.3.5.8).
UE must initiate RRC re-establishment or fallback
to source cell if possible.
"""
self.timers["T304"].stop()
self.sub_state = RRCSubState.CONNECTED_NORMAL
# Attempt RRC re-establishment
self._initiate_rrc_reestablishment(
cause="handoverFailure")
def _on_t310_expiry(self):
"""T310 expired: Radio link failure detected.
Physical layer has reported consecutive out-of-sync
indications. Start T311 for cell selection.
TS 38.331 Section 5.3.10.3.
"""
self.timers["T310"].stop()
# Start T311 (cell selection after RLF)
self.timers["T311"].start()
# Suspend all SRBs except SRB0
self._suspend_srbs()
# Initiate cell selection procedure
self._start_cell_selection_after_rlf()
def _on_t311_expiry(self):
"""T311 expired: No suitable cell found after RLF.
Transition to RRC_IDLE. Connection is lost.
"""
self.timers["T311"].stop()
self._transition_to(RRCState.RRC_IDLE,
RRCSubState.IDLE_ANY_CELL_SELECTION)
self._notify_nas("RRC_CONNECTION_FAILURE",
cause="RADIO_LINK_FAILURE")
def _on_t301_expiry(self):
"""T301 expired: RRC re-establishment failed."""
self.timers["T301"].stop()
self._transition_to(RRCState.RRC_IDLE,
RRCSubState.IDLE_CAMPED_NORMALLY)
self._notify_nas("RRC_REESTABLISHMENT_FAILURE",
cause="T301_EXPIRED")
def _on_t319_expiry(self):
"""T319 expired: RRC resume failed."""
timer = self.timers["T319"]
if timer.retry_count < timer.max_retries:
timer.retry_count += 1
timer.start()
else:
timer.stop()
self._transition_to(RRCState.RRC_IDLE,
RRCSubState.IDLE_CAMPED_NORMALLY)
self._notify_nas("RRC_RESUME_FAILURE",
cause="T319_EXPIRED")
def _apply_meas_config(self, meas_config: dict):
"""Apply measurement configuration (TS 38.331 5.5.2)."""
# Add/modify measurement objects
if "measObjectToAddModList" in meas_config:
for obj in meas_config["measObjectToAddModList"]:
self._add_meas_object(obj)
# Add/modify report configurations
if "reportConfigToAddModList" in meas_config:
for rpt in meas_config["reportConfigToAddModList"]:
self._add_report_config(rpt)
# Add/modify measurement IDs (link object to report)
if "measIdToAddModList" in meas_config:
for mid in meas_config["measIdToAddModList"]:
self.meas_configs[mid["measId"]] = \
MeasurementConfig(
meas_id=mid["measId"],
meas_object_id=mid["measObjectId"],
report_config_id=mid["reportConfigId"],
event_type=mid.get("event", "A3"))
# Quantity config (filtering coefficients)
if "quantityConfig" in meas_config:
self._apply_quantity_config(
meas_config["quantityConfig"])
def evaluate_measurement_events(self, serving_rsrp: float,
neighbor_measurements: dict):
"""Evaluate measurement event triggers (TS 38.331 5.5.4).
Args:
serving_rsrp: Serving cell RSRP in dBm
neighbor_measurements: {pci: rsrp_dbm} for neighbors
"""
for meas_id, config in self.meas_configs.items():
if config.event_type == "A3":
# A3: Neighbour becomes offset better than SpCell
for pci, rsrp in neighbor_measurements.items():
mn = rsrp # Mn: measurement of neighbour
ms = serving_rsrp # Ms: measurement of SpCell
ofn = config.offset_db # frequency offset
hys = config.hysteresis_db
# Entering condition: Mn + Ofn - Hys > Ms + Off
if mn + ofn - hys > ms + config.offset_db:
self._trigger_measurement_report(
meas_id, pci, rsrp)
elif config.event_type == "A2":
# A2: Serving becomes worse than threshold
thresh = config.threshold_rsrp
hys = config.hysteresis_db
if serving_rsrp + hys < thresh:
self._trigger_measurement_report(
meas_id, None, serving_rsrp)
def _transition_to(self, new_state: RRCState,
new_sub_state: RRCSubState):
"""Execute state transition with cleanup."""
old_state = self.state
# State exit actions
if old_state == RRCState.RRC_CONNECTED and \
new_state != RRCState.RRC_CONNECTED:
self._release_all_drbs()
self.meas_configs.clear()
self.as_security_activated = False
if old_state == RRCState.RRC_CONNECTED and \
new_state == RRCState.RRC_INACTIVE:
# Preserve context for resume
pass # i-RNTI and resume MAC-I already stored
if new_state == RRCState.RRC_IDLE:
self.c_rnti = None
self.i_rnti = None
self.kgnb = None
self._stop_all_timers()
self.state = new_state
self.sub_state = new_sub_state
# ... (placeholder methods for encoding/decoding/transport)
def _encode_rrc_setup_request(self, cause, identity):
"""ASN.1 PER encode RRCSetupRequest."""
pass
def _decode_rrc_setup(self, msg): return {}
def _decode_rrc_reconfiguration(self, msg): return {}
def _encode_rrc_setup_complete(self, plmn_id): return b""
def _encode_rrc_reconfiguration_complete(self): return b""
def _send_on_srb(self, srb_id, msg): pass
def _setup_srb(self, srb_id, config): pass
def _setup_drb(self, drb_id, pdcp, sdap): pass
def _release_drb(self, drb_id): pass
def _release_all_drbs(self): pass
def _configure_scell(self, config): pass
def _apply_cell_group_config(self, config): pass
def _notify_nas(self, event, **kwargs): pass
def _suspend_srbs(self): pass
def _start_cell_selection_after_rlf(self): pass
def _initiate_rrc_reestablishment(self, cause): pass
def _initiate_contention_free_ra(self, pci, config): pass
def _derive_kgnb_star(self, kgnb, pci, ncc): return b""
def _add_meas_object(self, obj): pass
def _add_report_config(self, rpt): pass
def _apply_quantity_config(self, config): pass
def _trigger_measurement_report(self, mid, pci, rsrp): pass
def _stop_all_timers(self):
for t in self.timers.values():
t.stop()
class InvalidStateError(Exception):
pass
Key things AI tools get wrong here: generating RRC state machines without T310/T311 cascading (radio link failure requires T310 expiry to trigger T311, and T311 expiry to trigger transition to IDLE — not a direct jump), missing the handover T304 timer that guards the handover execution window, and treating measurement events as simple threshold comparisons without hysteresis and time-to-trigger filtering. Claude Code handles the 3GPP timer semantics well because it can reason through the spec’s procedural descriptions. Cursor excels when the protocol stack files are already in the project context, indexing across RRC/PDCP/RLC/MAC layers. Copilot generates reasonable message structure boilerplate but misses the timer interaction logic that makes a protocol implementation correct under failure conditions.
SIP/VoIP Message Processing
SIP is a text-based protocol that looks deceptively simple but contains enormous complexity in dialog management, transaction state machines, and SDP offer/answer negotiation. A production SIP stack must handle request routing, Via header manipulation, Record-Route processing for in-dialog requests, and proper transaction retransmission for unreliable transports. Here is a SIP dialog state machine with SDP handling:
# SIP Dialog State Machine with SDP Negotiation
# Reference: RFC 3261, RFC 3264 (Offer/Answer)
from enum import Enum, auto
from dataclasses import dataclass, field
from typing import Optional, Dict, List, Tuple
import hashlib
import time
import re
class DialogState(Enum):
NONE = auto()
EARLY = auto() # After 1xx with To-tag
CONFIRMED = auto() # After 2xx
TERMINATED = auto() # After BYE or error
class TransactionState(Enum):
# INVITE client transaction (RFC 3261 Section 17.1.1)
ICT_CALLING = auto()
ICT_PROCEEDING = auto()
ICT_COMPLETED = auto()
ICT_TERMINATED = auto()
# INVITE server transaction (RFC 3261 Section 17.2.1)
IST_PROCEEDING = auto()
IST_COMPLETED = auto()
IST_CONFIRMED = auto()
IST_TERMINATED = auto()
# Non-INVITE client transaction
NICT_TRYING = auto()
NICT_PROCEEDING = auto()
NICT_COMPLETED = auto()
NICT_TERMINATED = auto()
@dataclass
class SDPSession:
"""SDP session description (RFC 4566)."""
version: int = 0
origin_username: str = "-"
origin_session_id: str = ""
origin_session_version: int = 0
connection_address: str = ""
media_descriptions: List[dict] = field(default_factory=list)
@classmethod
def parse(cls, sdp_text: str) -> "SDPSession":
"""Parse SDP from text."""
session = cls()
current_media = None
for line in sdp_text.strip().split("\r\n"):
if len(line) < 2 or line[1] != "=":
continue
field_type = line[0]
value = line[2:]
if field_type == "v":
session.version = int(value)
elif field_type == "o":
parts = value.split()
if len(parts) >= 6:
session.origin_username = parts[0]
session.origin_session_id = parts[1]
session.origin_session_version = int(parts[2])
elif field_type == "c":
# c=IN IP4 192.168.1.100
parts = value.split()
if len(parts) >= 3:
addr = parts[2]
if current_media:
current_media["connection"] = addr
else:
session.connection_address = addr
elif field_type == "m":
# m=audio 49170 RTP/AVP 0 8 97
parts = value.split()
current_media = {
"type": parts[0],
"port": int(parts[1]),
"protocol": parts[2],
"formats": parts[3:],
"attributes": {},
"rtpmap": {},
"fmtp": {},
"connection": None,
}
session.media_descriptions.append(current_media)
elif field_type == "a" and current_media:
if ":" in value:
attr_name, attr_value = value.split(":", 1)
if attr_name == "rtpmap":
# a=rtpmap:97 AMR-WB/16000/1
pt_rest = attr_value.split(" ", 1)
if len(pt_rest) == 2:
pt = pt_rest[0]
current_media["rtpmap"][pt] = \
pt_rest[1]
elif attr_name == "fmtp":
pt_rest = attr_value.split(" ", 1)
if len(pt_rest) == 2:
current_media["fmtp"][pt_rest[0]] = \
pt_rest[1]
else:
current_media["attributes"][attr_name] = \
attr_value
else:
# Direction attributes: sendrecv, recvonly, etc.
current_media["attributes"][value] = ""
return session
def negotiate_answer(self, offer: "SDPSession") -> \
"SDPSession":
"""Generate SDP answer from received offer (RFC 3264).
For each media line in the offer, include a corresponding
line in the answer with intersected codecs. If no codecs
match, set port to 0 (reject media line).
"""
answer = SDPSession()
answer.origin_session_id = self.origin_session_id
answer.origin_session_version = \
self.origin_session_version + 1
answer.connection_address = self.connection_address
for offered_media in offer.media_descriptions:
answer_media = {
"type": offered_media["type"],
"protocol": offered_media["protocol"],
"attributes": {},
"rtpmap": {},
"fmtp": {},
"connection": self.connection_address,
}
# Intersect supported codecs
our_codecs = self._get_supported_codecs(
offered_media["type"])
matched_formats = []
for fmt in offered_media["formats"]:
codec_name = offered_media["rtpmap"].get(
fmt, fmt)
if self._codec_supported(
offered_media["type"], codec_name):
matched_formats.append(fmt)
if fmt in offered_media["rtpmap"]:
answer_media["rtpmap"][fmt] = \
offered_media["rtpmap"][fmt]
if fmt in offered_media["fmtp"]:
answer_media["fmtp"][fmt] = \
offered_media["fmtp"][fmt]
if matched_formats:
answer_media["port"] = \
self._allocate_rtp_port()
answer_media["formats"] = matched_formats
answer_media["attributes"]["sendrecv"] = ""
else:
# Reject this media line (port 0)
answer_media["port"] = 0
answer_media["formats"] = \
offered_media["formats"][:1]
answer.media_descriptions.append(answer_media)
return answer
def _get_supported_codecs(self, media_type):
return {}
def _codec_supported(self, media_type, codec_name):
return True
def _allocate_rtp_port(self):
return 0
@dataclass
class SIPDialog:
"""SIP dialog state manager (RFC 3261 Section 12)."""
call_id: str = ""
local_tag: str = ""
remote_tag: str = ""
local_uri: str = ""
remote_uri: str = ""
remote_target: str = "" # Contact URI
route_set: List[str] = field(default_factory=list)
local_cseq: int = 0
remote_cseq: int = 0
state: DialogState = DialogState.NONE
# SDP state
local_sdp: Optional[SDPSession] = None
remote_sdp: Optional[SDPSession] = None
sdp_offer_pending: bool = False
def create_request(self, method: str) -> dict:
"""Create in-dialog request (RFC 3261 Section 12.2.1.1).
Request-URI is remote_target (Contact from peer).
Route headers from route_set.
"""
self.local_cseq += 1
request = {
"method": method,
"request_uri": self.remote_target,
"headers": {
"Via": self._generate_via(),
"From": f"<{self.local_uri}>"
f";tag={self.local_tag}",
"To": f"<{self.remote_uri}>"
f";tag={self.remote_tag}",
"Call-ID": self.call_id,
"CSeq": f"{self.local_cseq} {method}",
"Max-Forwards": "70",
}
}
# Add Route headers from route set
if self.route_set:
# If first route is loose-routing (lr param),
# use route set as-is. Otherwise, first route
# becomes Request-URI (strict routing, RFC 3261
# Section 12.2.1.1).
first_route = self.route_set[0]
if ";lr" in first_route.lower():
request["headers"]["Route"] = \
", ".join(self.route_set)
else:
request["request_uri"] = first_route
remaining = self.route_set[1:]
remaining.append(f"<{self.remote_target}>")
request["headers"]["Route"] = \
", ".join(remaining)
return request
def process_response(self, response: dict):
"""Update dialog state from response."""
status_code = response.get("status_code", 0)
if 100 <= status_code <= 199:
to_tag = self._extract_tag(
response["headers"].get("To", ""))
if to_tag and self.state == DialogState.NONE:
self.remote_tag = to_tag
self.state = DialogState.EARLY
self._update_route_set(response)
self._update_remote_target(response)
elif 200 <= status_code <= 299:
to_tag = self._extract_tag(
response["headers"].get("To", ""))
if to_tag:
self.remote_tag = to_tag
self.state = DialogState.CONFIRMED
self._update_route_set(response)
self._update_remote_target(response)
# Process SDP answer if present
if "body" in response and \
"application/sdp" in response.get(
"headers", {}).get("Content-Type", ""):
self.remote_sdp = SDPSession.parse(
response["body"])
self.sdp_offer_pending = False
elif 300 <= status_code <= 699:
if self.state == DialogState.EARLY:
self.state = DialogState.TERMINATED
def send_bye(self):
"""Terminate dialog with BYE."""
if self.state not in (DialogState.CONFIRMED,
DialogState.EARLY):
return None
request = self.create_request("BYE")
self.state = DialogState.TERMINATED
return request
def handle_reinvite(self, request: dict):
"""Process re-INVITE for hold/resume/codec change."""
if self.state != DialogState.CONFIRMED:
return {"status_code": 481,
"reason": "Call/Transaction Does Not Exist"}
# Validate CSeq
cseq_num = int(
request["headers"]["CSeq"].split()[0])
if cseq_num <= self.remote_cseq:
return {"status_code": 500,
"reason": "CSeq out of order"}
self.remote_cseq = cseq_num
# Process SDP offer
if "body" in request:
offer = SDPSession.parse(request["body"])
self.remote_sdp = offer
# Generate SDP answer
if self.local_sdp:
answer = self.local_sdp.negotiate_answer(offer)
return {
"status_code": 200,
"reason": "OK",
"body": answer,
"headers": {
"Content-Type": "application/sdp"
}
}
return {"status_code": 200, "reason": "OK"}
def _extract_tag(self, header_value: str) -> Optional[str]:
match = re.search(r";tag=([^\s;,]+)", header_value)
return match.group(1) if match else None
def _generate_via(self):
branch = f"z9hG4bK{hashlib.md5(str(time.monotonic()).encode()).hexdigest()[:16]}"
return f"SIP/2.0/UDP 0.0.0.0:5060;branch={branch}"
def _update_route_set(self, response):
rr = response.get("headers", {}).get("Record-Route")
if rr:
# Reverse for UAS (RFC 3261 Section 12.1.2)
routes = [r.strip() for r in rr.split(",")]
self.route_set = list(reversed(routes))
def _update_remote_target(self, response):
contact = response.get("headers", {}).get("Contact")
if contact:
match = re.search(r"<([^>]+)>", contact)
if match:
self.remote_target = match.group(1)
AI tools consistently make three mistakes with SIP: treating it as a stateless request-response protocol (like HTTP), ignoring the Record-Route/Route header chain required for in-dialog request routing, and generating SDP answers that simply echo the offer instead of intersecting codec capabilities. Claude Code understands SIP dialog semantics because it can reason through the RFC’s procedural rules. Cursor performs well when the SIP stack code is already in the project — it indexes across dialog, transaction, and transport layers. Copilot generates syntactically correct SIP message construction but misses the branch parameter magic cookie requirement (z9hG4bK prefix, RFC 3261 Section 8.1.1.7) and the strict vs. loose routing distinction.
Network Function Lifecycle Management
Cloud-native 5G network functions require lifecycle management that goes beyond standard Kubernetes deployment patterns. A telecom CNF must support graceful session draining during scale-in (you cannot kill a pod that has active VoNR calls), health monitoring that understands signaling protocol state (a pod can be “Running” in Kubernetes terms but have lost its NRF registration), and auto-scaling based on telecom-specific KPIs rather than generic CPU/memory metrics:
# Cloud-Native Network Function Lifecycle Manager
# 5G Core CNF with graceful draining and telecom KPI scaling
import asyncio
import aiohttp
import json
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set
from enum import Enum, auto
class NFState(Enum):
INSTANTIATING = auto()
REGISTERED = auto() # Registered with NRF
ACTIVE = auto() # Serving traffic
DRAINING = auto() # Graceful shutdown
DEREGISTERING = auto() # Removing from NRF
TERMINATED = auto()
@dataclass
class ActiveSession:
"""Tracks an active signaling session on this NF instance."""
session_id: str
session_type: str # "pdu_session", "registration", etc.
created_at: float
last_activity: float
ue_supi: str
priority: int = 0 # Emergency sessions get higher priority
@dataclass
class NFInstance:
"""Represents a single CNF instance (pod)."""
instance_id: str
nf_type: str # "AMF", "SMF", "UPF", "PCF", etc.
state: NFState = NFState.INSTANTIATING
api_endpoint: str = ""
nrf_registration_id: Optional[str] = None
active_sessions: Dict[str, ActiveSession] = field(
default_factory=dict)
health_check_failures: int = 0
last_heartbeat: float = 0.0
class CNFLifecycleManager:
"""Manages lifecycle of 5G Core cloud-native network functions.
Implements ETSI NFV SOL005/SOL006 lifecycle operations adapted
for Kubernetes-native deployment with 3GPP NRF integration.
"""
def __init__(self, nf_type: str, nrf_url: str,
namespace: str = "5gc"):
self.nf_type = nf_type
self.nrf_url = nrf_url
self.namespace = namespace
self.instances: Dict[str, NFInstance] = {}
self.drain_timeout_s = 300 # 5 min max drain
self.heartbeat_interval_s = 10
self.max_health_failures = 3
# Telecom-specific scaling thresholds
self.scale_config = {
"AMF": {
"metric": "active_registrations",
"scale_up_threshold": 50000,
"scale_down_threshold": 20000,
"min_replicas": 2,
"max_replicas": 20,
"cooldown_s": 120,
},
"SMF": {
"metric": "active_pdu_sessions",
"scale_up_threshold": 100000,
"scale_down_threshold": 40000,
"min_replicas": 2,
"max_replicas": 30,
"cooldown_s": 180,
},
"UPF": {
"metric": "throughput_gbps",
"scale_up_threshold": 80, # 80 Gbps per UPF
"scale_down_threshold": 30,
"min_replicas": 2,
"max_replicas": 50,
"cooldown_s": 60,
},
}
async def instantiate(self, instance_id: str,
config: dict) -> NFInstance:
"""Instantiate a new NF instance.
Steps:
1. Create instance record
2. Deploy pod via Kubernetes API
3. Wait for readiness
4. Register with NRF (TS 29.510)
5. Transition to ACTIVE
"""
instance = NFInstance(
instance_id=instance_id,
nf_type=self.nf_type,
api_endpoint=config.get("api_endpoint", ""),
)
self.instances[instance_id] = instance
# Deploy pod (simplified K8s API call)
await self._deploy_pod(instance, config)
# Wait for pod readiness with timeout
ready = await self._wait_for_ready(
instance, timeout_s=120)
if not ready:
instance.state = NFState.TERMINATED
raise NFLifecycleError(
f"Instance {instance_id} failed readiness check")
# Register with NRF (3GPP TS 29.510)
nrf_reg = await self._register_with_nrf(instance)
instance.nrf_registration_id = nrf_reg
instance.state = NFState.REGISTERED
# Start heartbeat loop
asyncio.create_task(
self._heartbeat_loop(instance))
instance.state = NFState.ACTIVE
return instance
async def graceful_drain(self, instance_id: str):
"""Gracefully drain an NF instance before termination.
Critical for telecom: cannot kill pods with active
signaling sessions. Must:
1. Deregister from NRF (stop receiving new sessions)
2. Wait for active sessions to complete or timeout
3. Force-migrate remaining sessions if drain times out
4. Terminate pod
"""
instance = self.instances.get(instance_id)
if not instance or instance.state == NFState.TERMINATED:
return
instance.state = NFState.DRAINING
# Step 1: Deregister from NRF to stop new traffic
await self._deregister_from_nrf(instance)
instance.state = NFState.DEREGISTERING
# Step 2: Wait for sessions to drain naturally
drain_start = time.monotonic()
while instance.active_sessions:
elapsed = time.monotonic() - drain_start
if elapsed > self.drain_timeout_s:
break
# Check for completed sessions
completed = []
for sid, session in instance.active_sessions.items():
idle_time = time.monotonic() - session.last_activity
if idle_time > 60: # 60s idle = likely done
completed.append(sid)
for sid in completed:
del instance.active_sessions[sid]
remaining = len(instance.active_sessions)
if remaining > 0:
# Log progress
await self._log_drain_progress(
instance_id, remaining,
self.drain_timeout_s - elapsed)
await asyncio.sleep(5)
# Step 3: Force-migrate remaining sessions
if instance.active_sessions:
emergency_count = sum(
1 for s in instance.active_sessions.values()
if s.priority > 0)
if emergency_count > 0:
# Emergency sessions: migrate, do not drop
await self._migrate_sessions(
instance,
[s for s in instance.active_sessions.values()
if s.priority > 0])
# Non-emergency: force release with cause
for session in list(
instance.active_sessions.values()):
if session.priority == 0:
await self._force_release_session(
instance, session,
cause="NF_INSTANCE_TERMINATING")
# Step 4: Terminate
await self._terminate_pod(instance)
instance.state = NFState.TERMINATED
async def evaluate_scaling(self):
"""Evaluate telecom KPI-based scaling decisions.
Unlike generic HPA which scales on CPU/memory, telecom
NFs scale on domain-specific metrics: active registrations
for AMF, active PDU sessions for SMF, throughput for UPF.
"""
config = self.scale_config.get(self.nf_type)
if not config:
return
active_instances = [
i for i in self.instances.values()
if i.state == NFState.ACTIVE]
current_count = len(active_instances)
# Collect aggregate metric
total_metric = 0
for instance in active_instances:
metric_val = await self._get_instance_metric(
instance, config["metric"])
total_metric += metric_val
per_instance = (total_metric / current_count
if current_count > 0 else 0)
# Scale up: average per instance exceeds threshold
if per_instance > config["scale_up_threshold"]:
if current_count < config["max_replicas"]:
new_id = f"{self.nf_type.lower()}-" \
f"{int(time.time())}"
await self.instantiate(new_id, {
"api_endpoint": self._generate_endpoint(
new_id)
})
return {"action": "scale_up",
"new_count": current_count + 1,
"reason": f"{config['metric']}="
f"{per_instance:.0f} > "
f"{config['scale_up_threshold']}"}
# Scale down: average below threshold, drain oldest
elif per_instance < config["scale_down_threshold"]:
if current_count > config["min_replicas"]:
# Select instance with fewest active sessions
target = min(active_instances,
key=lambda i: len(i.active_sessions))
await self.graceful_drain(target.instance_id)
return {"action": "scale_down",
"new_count": current_count - 1,
"reason": f"{config['metric']}="
f"{per_instance:.0f} < "
f"{config['scale_down_threshold']}"}
return {"action": "none", "current_count": current_count}
async def _register_with_nrf(self, instance: NFInstance) \
-> str:
"""Register NF instance with NRF (TS 29.510 Section 5.2.2.2).
PUT /nnrf-nfm/v1/nf-instances/{nfInstanceId}
"""
nf_profile = {
"nfInstanceId": instance.instance_id,
"nfType": self.nf_type,
"nfStatus": "REGISTERED",
"ipv4Addresses": [
instance.api_endpoint.split(":")[0]],
"nfServices": [{
"serviceInstanceId": f"{instance.instance_id}"
f"-svc-1",
"serviceName": self._get_service_name(),
"versions": [{"apiVersionInUri": "v1",
"apiFullVersion": "1.0.0"}],
"scheme": "https",
"nfServiceStatus": "REGISTERED",
}],
"heartBeatTimer": self.heartbeat_interval_s,
}
async with aiohttp.ClientSession() as session:
url = (f"{self.nrf_url}/nnrf-nfm/v1/nf-instances/"
f"{instance.instance_id}")
async with session.put(url,
json=nf_profile) as resp:
if resp.status in (200, 201):
return instance.instance_id
raise NFLifecycleError(
f"NRF registration failed: {resp.status}")
async def _deregister_from_nrf(self, instance: NFInstance):
"""Deregister from NRF (TS 29.510 Section 5.2.2.4)."""
async with aiohttp.ClientSession() as session:
url = (f"{self.nrf_url}/nnrf-nfm/v1/nf-instances/"
f"{instance.instance_id}")
async with session.delete(url) as resp:
if resp.status not in (200, 204, 404):
raise NFLifecycleError(
f"NRF deregistration failed: "
f"{resp.status}")
async def _heartbeat_loop(self, instance: NFInstance):
"""NRF heartbeat (TS 29.510 Section 5.2.2.3)."""
while instance.state in (NFState.REGISTERED,
NFState.ACTIVE):
try:
async with aiohttp.ClientSession() as session:
url = (f"{self.nrf_url}/nnrf-nfm/v1/"
f"nf-instances/"
f"{instance.instance_id}")
patch = [{"op": "replace",
"path": "/nfStatus",
"value": "REGISTERED"}]
async with session.patch(
url, json=patch) as resp:
if resp.status == 200:
instance.health_check_failures = 0
instance.last_heartbeat = \
time.monotonic()
else:
instance.health_check_failures += 1
except Exception:
instance.health_check_failures += 1
if instance.health_check_failures >= \
self.max_health_failures:
# Self-heal: re-register
await self._register_with_nrf(instance)
instance.health_check_failures = 0
await asyncio.sleep(self.heartbeat_interval_s)
def _get_service_name(self) -> str:
service_map = {
"AMF": "namf-comm",
"SMF": "nsmf-pdusession",
"UPF": "nupf-ee",
"PCF": "npcf-smpolicycontrol",
"UDM": "nudm-sdm",
"AUSF": "nausf-auth",
"NRF": "nnrf-nfm",
"NSSF": "nnssf-nsselection",
}
return service_map.get(self.nf_type, f"n{self.nf_type.lower()}")
# Stubs for infrastructure operations
async def _deploy_pod(self, inst, config): pass
async def _wait_for_ready(self, inst, timeout_s): return True
async def _terminate_pod(self, inst): pass
async def _get_instance_metric(self, inst, metric): return 0
async def _migrate_sessions(self, inst, sessions): pass
async def _force_release_session(self, inst, session,
cause=""): pass
async def _log_drain_progress(self, iid, remaining,
time_left): pass
def _generate_endpoint(self, new_id): return ""
class NFLifecycleError(Exception):
pass
The critical insight that AI tools miss: standard Kubernetes graceful shutdown (preStop hooks with a fixed timeout) is insufficient for telecom. A pod running an SMF instance may have 100,000 active PDU sessions, and killing that pod means 100,000 users lose data connectivity simultaneously. The drain procedure must first deregister from NRF (so no new sessions arrive), then wait for sessions to complete naturally, then force-migrate high-priority sessions (emergency calls), and only then terminate. Claude Code handles this well because it can reason about the cascading effects of pod termination on active signaling sessions. Cursor excels at multi-file lifecycle management when Helm charts, operator code, and NF application code are all indexed together.
Protocol State Machine Implementation
The 5G NAS (Non-Access Stratum) PDU Session Establishment procedure involves a multi-step state machine with timers, guard conditions, and error recovery paths. Getting this wrong means users cannot establish data connectivity. Here is a formal state machine for PDU Session Establishment as specified in TS 24.501:
# 5G NAS PDU Session Establishment State Machine
# Reference: 3GPP TS 24.501 Section 6.4.1
from enum import Enum, auto
from dataclasses import dataclass, field
from typing import Optional, Dict, Callable
import time
class PDUSessionState(Enum):
PDU_SESSION_INACTIVE = auto()
PDU_SESSION_ESTABLISHMENT_PENDING = auto()
PDU_SESSION_ACTIVE = auto()
PDU_SESSION_MODIFICATION_PENDING = auto()
PDU_SESSION_RELEASE_PENDING = auto()
class PDUSessionType(Enum):
IPV4 = 1
IPV6 = 2
IPV4V6 = 3
UNSTRUCTURED = 4
ETHERNET = 5
class EstablishmentCause(Enum):
NORMAL = auto()
EMERGENCY = auto()
HIGH_PRIORITY = auto()
@dataclass
class PDUSessionContext:
"""PDU Session context maintained at UE."""
pdu_session_id: int # 1-15
pti: int # Procedure Transaction Identity (1-254)
session_type: PDUSessionType = PDUSessionType.IPV4V6
ssc_mode: int = 1 # SSC mode 1, 2, or 3
dnn: str = "" # Data Network Name
s_nssai: Optional[dict] = None # Single NSSAI
# Assigned by network
pdu_address: Optional[str] = None
authorized_qos_rules: Dict[int, dict] = field(
default_factory=dict)
authorized_qos_flow_descriptions: Dict[int, dict] = field(
default_factory=dict)
session_ambr: Optional[dict] = None # DL/UL AMBR
# State
state: PDUSessionState = PDUSessionState.PDU_SESSION_INACTIVE
# 5GSM cause from network
last_cause: Optional[int] = None
@dataclass
class NASTimer:
name: str
duration_ms: int
started_at: Optional[float] = None
on_expiry: Optional[Callable] = None
max_retransmissions: int = 0
retransmission_count: int = 0
def start(self):
self.started_at = time.monotonic()
def stop(self):
self.started_at = None
self.retransmission_count = 0
def is_expired(self) -> bool:
if self.started_at is None:
return False
return ((time.monotonic() - self.started_at) * 1000
>= self.duration_ms)
class PDUSessionManager:
"""Manages PDU Session lifecycle per TS 24.501.
Handles establishment, modification, and release with
full timer management, retransmission, and error recovery.
"""
MAX_PDU_SESSIONS = 15
def __init__(self):
self.sessions: Dict[int, PDUSessionContext] = {}
self.timers: Dict[str, NASTimer] = {}
self._next_pti = 1
# Backoff timers from network rejection
self.t3580 = NASTimer("T3580", 16000,
max_retransmissions=4)
self.t3581 = NASTimer("T3581", 16000,
max_retransmissions=4)
self.t3582 = NASTimer("T3582", 16000,
max_retransmissions=4)
def _allocate_pti(self) -> int:
"""Allocate Procedure Transaction Identity (1-254)."""
pti = self._next_pti
self._next_pti = (self._next_pti % 254) + 1
return pti
def initiate_establishment(
self, pdu_session_id: int,
session_type: PDUSessionType,
dnn: str,
s_nssai: Optional[dict] = None,
cause: EstablishmentCause = EstablishmentCause.NORMAL
) -> PDUSessionContext:
"""UE-requested PDU Session Establishment (TS 24.501 6.4.1.2).
Preconditions:
- UE in 5GMM-REGISTERED state
- PDU session ID not already active
- Not barred by T3396/T3584 backoff timer
State: INACTIVE -> ESTABLISHMENT_PENDING
"""
# Guard: session ID already in use
if pdu_session_id in self.sessions:
existing = self.sessions[pdu_session_id]
if existing.state != \
PDUSessionState.PDU_SESSION_INACTIVE:
raise SessionError(
f"PDU session {pdu_session_id} already "
f"in state {existing.state}")
# Guard: max sessions
active = sum(1 for s in self.sessions.values()
if s.state != \
PDUSessionState.PDU_SESSION_INACTIVE)
if active >= self.MAX_PDU_SESSIONS:
raise SessionError(
"Maximum PDU sessions reached")
# Create session context
pti = self._allocate_pti()
session = PDUSessionContext(
pdu_session_id=pdu_session_id,
pti=pti,
session_type=session_type,
dnn=dnn,
s_nssai=s_nssai,
)
# Build PDU SESSION ESTABLISHMENT REQUEST
msg = self._build_establishment_request(session)
# Transition: INACTIVE -> ESTABLISHMENT_PENDING
session.state = \
PDUSessionState.PDU_SESSION_ESTABLISHMENT_PENDING
self.sessions[pdu_session_id] = session
# Start T3580 (guards establishment procedure)
timer_key = f"T3580_{pdu_session_id}"
self.timers[timer_key] = NASTimer(
"T3580", 16000, max_retransmissions=4,
on_expiry=lambda: self._on_t3580_expiry(
pdu_session_id))
self.timers[timer_key].start()
# Send via NAS transport (encapsulated in UL NAS TRANSPORT)
self._send_nas_sm_message(
pdu_session_id, pti, msg)
return session
def handle_establishment_accept(
self, pdu_session_id: int, msg: dict):
"""Process PDU SESSION ESTABLISHMENT ACCEPT (TS 24.501 6.4.1.3).
State: ESTABLISHMENT_PENDING -> ACTIVE
"""
session = self.sessions.get(pdu_session_id)
if not session:
return
if session.state != \
PDUSessionState.PDU_SESSION_ESTABLISHMENT_PENDING:
# Unexpected accept in current state
return
# Stop T3580
timer_key = f"T3580_{pdu_session_id}"
if timer_key in self.timers:
self.timers[timer_key].stop()
# Apply authorized parameters from network
session.session_type = PDUSessionType(
msg.get("selected_pdu_session_type",
session.session_type.value))
session.ssc_mode = msg.get("selected_ssc_mode",
session.ssc_mode)
# PDU address assignment
if "pdu_address" in msg:
session.pdu_address = msg["pdu_address"]
# QoS rules (mandatory IE)
if "authorized_qos_rules" in msg:
session.authorized_qos_rules = \
msg["authorized_qos_rules"]
# Session AMBR (mandatory IE)
if "session_ambr" in msg:
session.session_ambr = msg["session_ambr"]
# QoS flow descriptions (optional)
if "authorized_qos_flow_descriptions" in msg:
session.authorized_qos_flow_descriptions = \
msg["authorized_qos_flow_descriptions"]
# Transition: ESTABLISHMENT_PENDING -> ACTIVE
session.state = PDUSessionState.PDU_SESSION_ACTIVE
# Notify upper layers (IP stack can now use this PDU session)
self._notify_data_plane(pdu_session_id, "ACTIVATED",
session.pdu_address)
def handle_establishment_reject(
self, pdu_session_id: int, msg: dict):
"""Process PDU SESSION ESTABLISHMENT REJECT (TS 24.501 6.4.1.4).
State: ESTABLISHMENT_PENDING -> INACTIVE
Must handle cause values:
- #26: Insufficient resources
- #27: Missing or unknown DNN
- #28: Unknown PDU session type
- #29: User authentication failed
- #31: Request rejected, unspecified
- #33: Requested service option not subscribed
- #36: Regular deactivation
- #43: Invalid PDU session identity
"""
session = self.sessions.get(pdu_session_id)
if not session:
return
# Stop T3580
timer_key = f"T3580_{pdu_session_id}"
if timer_key in self.timers:
self.timers[timer_key].stop()
cause = msg.get("5gsm_cause", 31)
session.last_cause = cause
# Handle backoff timer if present
if "back_off_timer" in msg:
backoff_ms = msg["back_off_timer"] * 1000
backoff_key = f"backoff_{pdu_session_id}_{session.dnn}"
self.timers[backoff_key] = NASTimer(
"T3584", backoff_ms)
self.timers[backoff_key].start()
# Transition: ESTABLISHMENT_PENDING -> INACTIVE
session.state = PDUSessionState.PDU_SESSION_INACTIVE
# Notify upper layers
self._notify_data_plane(pdu_session_id, "REJECTED",
cause=cause)
def _on_t3580_expiry(self, pdu_session_id: int):
"""T3580 expired during PDU Session Establishment.
TS 24.501 Section 6.4.1.6:
If T3580 expires, retransmit the request up to 4 times.
After max retransmissions, abort and move to INACTIVE.
"""
timer_key = f"T3580_{pdu_session_id}"
timer = self.timers.get(timer_key)
session = self.sessions.get(pdu_session_id)
if not timer or not session:
return
if timer.retransmission_count < \
timer.max_retransmissions:
timer.retransmission_count += 1
timer.start() # Restart timer
# Retransmit the establishment request
msg = self._build_establishment_request(session)
self._send_nas_sm_message(
pdu_session_id, session.pti, msg)
else:
# Max retransmissions reached, abort
timer.stop()
session.state = \
PDUSessionState.PDU_SESSION_INACTIVE
self._notify_data_plane(
pdu_session_id, "ESTABLISHMENT_FAILED",
cause="T3580_MAX_RETRANSMISSIONS")
def initiate_release(self, pdu_session_id: int,
cause: int = 36):
"""UE-requested PDU Session Release (TS 24.501 6.4.3.2).
State: ACTIVE -> RELEASE_PENDING
"""
session = self.sessions.get(pdu_session_id)
if not session or \
session.state != PDUSessionState.PDU_SESSION_ACTIVE:
return
pti = self._allocate_pti()
session.pti = pti
# Build release request
msg = {"message_type": "PDU_SESSION_RELEASE_REQUEST",
"5gsm_cause": cause}
session.state = \
PDUSessionState.PDU_SESSION_RELEASE_PENDING
# Start T3582
timer_key = f"T3582_{pdu_session_id}"
self.timers[timer_key] = NASTimer(
"T3582", 16000, max_retransmissions=4,
on_expiry=lambda: self._on_t3582_expiry(
pdu_session_id))
self.timers[timer_key].start()
self._send_nas_sm_message(
pdu_session_id, pti, msg)
def handle_release_command(self, pdu_session_id: int,
msg: dict):
"""Network-initiated release (TS 24.501 6.4.3.3).
State: any -> INACTIVE
"""
session = self.sessions.get(pdu_session_id)
if not session:
return
# Stop any running timers for this session
for key in list(self.timers.keys()):
if str(pdu_session_id) in key:
self.timers[key].stop()
session.last_cause = msg.get("5gsm_cause", 36)
session.state = PDUSessionState.PDU_SESSION_INACTIVE
# Deactivate data plane
self._notify_data_plane(pdu_session_id, "RELEASED",
cause=session.last_cause)
# Send PDU SESSION RELEASE COMPLETE
complete = {"message_type":
"PDU_SESSION_RELEASE_COMPLETE"}
self._send_nas_sm_message(
pdu_session_id, session.pti, complete)
def _on_t3582_expiry(self, pdu_session_id: int):
"""T3582 expired during release."""
timer_key = f"T3582_{pdu_session_id}"
timer = self.timers.get(timer_key)
session = self.sessions.get(pdu_session_id)
if not timer or not session:
return
if timer.retransmission_count < \
timer.max_retransmissions:
timer.retransmission_count += 1
timer.start()
msg = {"message_type":
"PDU_SESSION_RELEASE_REQUEST",
"5gsm_cause": 36}
self._send_nas_sm_message(
pdu_session_id, session.pti, msg)
else:
timer.stop()
session.state = \
PDUSessionState.PDU_SESSION_INACTIVE
self._notify_data_plane(
pdu_session_id, "RELEASE_TIMEOUT")
# Stubs
def _build_establishment_request(self, session): return {}
def _send_nas_sm_message(self, sid, pti, msg): pass
def _notify_data_plane(self, sid, event, **kwargs): pass
class SessionError(Exception):
pass
The state machine correctness requirements here are absolute. Every state transition has preconditions (guard conditions that must be true), actions (messages sent, timers started/stopped, context updated), and post-conditions (the new state and its invariants). AI tools commonly generate state machines that handle the happy path (establishment succeeds) but miss the failure paths: T3580 expiry and retransmission, rejection with backoff timer, network-initiated release during establishment. Claude Code is strongest here because it can reason through the procedural specification text and identify the complete set of state transitions. Cursor helps when the state machine code, timer management, and message encoding are in separate files that need cross-referencing. Copilot generates basic enum-based state machines but consistently misses timer management and retransmission logic.
Real-Time Signaling Pipeline
The user plane in 5G is built on GTP-U tunnels between gNB and UPF, managed by PFCP sessions between SMF and UPF. The UPF must process millions of packets per second with per-packet QoS enforcement, charging, and lawful intercept — all at line rate. Here is a GTP-U tunnel manager with PFCP session integration:
# GTP-U Tunnel Manager with PFCP Session Integration
# Reference: TS 29.281 (GTP-U), TS 29.244 (PFCP)
import struct
import socket
import asyncio
from dataclasses import dataclass, field
from typing import Dict, Optional, Tuple, List
from enum import IntEnum
import time
class GTPUMessageType(IntEnum):
ECHO_REQUEST = 1
ECHO_RESPONSE = 2
ERROR_INDICATION = 26
SUPPORTED_EXTENSION_HEADERS = 31
END_MARKER = 254
G_PDU = 255 # User data
class PFCPMessageType(IntEnum):
HEARTBEAT_REQUEST = 1
HEARTBEAT_RESPONSE = 2
SESSION_ESTABLISHMENT_REQUEST = 50
SESSION_ESTABLISHMENT_RESPONSE = 51
SESSION_MODIFICATION_REQUEST = 52
SESSION_MODIFICATION_RESPONSE = 53
SESSION_DELETION_REQUEST = 54
SESSION_DELETION_RESPONSE = 55
SESSION_REPORT_REQUEST = 56
SESSION_REPORT_RESPONSE = 57
@dataclass
class GTPUHeader:
"""GTP-U header (TS 29.281 Section 5.1).
Flags: Version(3b)=1, PT(1b)=1, E/S/PN bits
Message Type: 255 for G-PDU (user data)
Length: payload length excluding first 8 mandatory bytes
TEID: Tunnel Endpoint Identifier (32-bit)
"""
version: int = 1
pt: int = 1 # Protocol Type (1=GTP)
e_flag: int = 0 # Extension header flag
s_flag: int = 0 # Sequence number flag
pn_flag: int = 0 # N-PDU number flag
message_type: int = GTPUMessageType.G_PDU
length: int = 0
teid: int = 0
sequence_number: Optional[int] = None
n_pdu_number: Optional[int] = None
extension_headers: List[bytes] = field(
default_factory=list)
def encode(self) -> bytes:
"""Encode GTP-U header to bytes."""
flags = ((self.version & 0x07) << 5) | \
((self.pt & 0x01) << 4) | \
((self.e_flag & 0x01) << 2) | \
((self.s_flag & 0x01) << 1) | \
(self.pn_flag & 0x01)
header = struct.pack("!BBHI",
flags,
self.message_type,
self.length,
self.teid)
# Optional fields present if any of E/S/PN set
if self.e_flag or self.s_flag or self.pn_flag:
seq = self.sequence_number or 0
npdu = self.n_pdu_number or 0
ext_type = (self.extension_headers[0][0]
if self.extension_headers else 0)
header += struct.pack("!HBB", seq, npdu, ext_type)
for ext in self.extension_headers:
header += ext
return header
@classmethod
def decode(cls, data: bytes) -> Tuple["GTPUHeader", int]:
"""Decode GTP-U header from bytes. Returns (header, offset)."""
if len(data) < 8:
raise ValueError("GTP-U header too short")
flags, msg_type, length, teid = struct.unpack(
"!BBHI", data[:8])
header = cls()
header.version = (flags >> 5) & 0x07
header.pt = (flags >> 4) & 0x01
header.e_flag = (flags >> 2) & 0x01
header.s_flag = (flags >> 1) & 0x01
header.pn_flag = flags & 0x01
header.message_type = msg_type
header.length = length
header.teid = teid
offset = 8
if header.e_flag or header.s_flag or header.pn_flag:
if len(data) < 12:
raise ValueError(
"GTP-U optional fields truncated")
seq, npdu, ext_type = struct.unpack(
"!HBB", data[8:12])
header.sequence_number = seq
header.n_pdu_number = npdu
offset = 12
# Parse extension headers
while ext_type != 0:
if offset >= len(data):
break
ext_len = data[offset] * 4 # length in 4-byte units
if offset + ext_len > len(data):
break
ext_data = data[offset:offset + ext_len]
header.extension_headers.append(ext_data)
ext_type = data[offset + ext_len - 1]
offset += ext_len
return header, offset
@dataclass
class TunnelEndpoint:
"""GTP-U tunnel endpoint definition."""
teid: int
remote_addr: str
remote_port: int = 2152 # Standard GTP-U port
local_teid: int = 0
qfi: int = 0 # QoS Flow Identifier
# Traffic counters
rx_packets: int = 0
rx_bytes: int = 0
tx_packets: int = 0
tx_bytes: int = 0
# Sequence numbering
tx_sequence: int = 0
rx_expected_sequence: int = 0
@dataclass
class PFCPSession:
"""PFCP session context (TS 29.244)."""
seid_local: int # Local Session Endpoint ID
seid_remote: int = 0 # Remote SEID (from SMF)
pdr_list: Dict[int, dict] = field(default_factory=dict)
far_list: Dict[int, dict] = field(default_factory=dict)
qer_list: Dict[int, dict] = field(default_factory=dict)
urr_list: Dict[int, dict] = field(default_factory=dict)
# PDR: Packet Detection Rule (match criteria)
# FAR: Forwarding Action Rule (what to do with matched packet)
# QER: QoS Enforcement Rule (rate limiting)
# URR: Usage Reporting Rule (charging)
class GTPUTunnelManager:
"""Manages GTP-U tunnels with PFCP session integration.
Handles tunnel creation/deletion, packet encapsulation/
decapsulation, QoS marking, and usage metering.
"""
def __init__(self, local_addr: str, gtpu_port: int = 2152):
self.local_addr = local_addr
self.gtpu_port = gtpu_port
self.tunnels: Dict[int, TunnelEndpoint] = {} # by local TEID
self.pfcp_sessions: Dict[int, PFCPSession] = {}
self._next_teid = 1
self._next_seid = 1
self.socket: Optional[socket.socket] = None
def allocate_teid(self) -> int:
"""Allocate unique local TEID."""
teid = self._next_teid
self._next_teid += 1
# TEID 0 is reserved (TS 29.281 Section 5.1)
if self._next_teid == 0:
self._next_teid = 1
return teid
def create_tunnel(self, remote_addr: str,
remote_teid: int,
qfi: int = 0) -> TunnelEndpoint:
"""Create a new GTP-U tunnel endpoint."""
local_teid = self.allocate_teid()
tunnel = TunnelEndpoint(
teid=remote_teid,
remote_addr=remote_addr,
local_teid=local_teid,
qfi=qfi,
)
self.tunnels[local_teid] = tunnel
return tunnel
def encapsulate(self, tunnel: TunnelEndpoint,
payload: bytes,
qfi: Optional[int] = None) -> bytes:
"""Encapsulate IP packet in GTP-U.
Adds GTP-U header with remote TEID, optional QFI
extension header for 5G QoS marking.
"""
use_qfi = qfi if qfi is not None else tunnel.qfi
header = GTPUHeader(
message_type=GTPUMessageType.G_PDU,
teid=tunnel.teid, # Remote TEID
)
# Add PDU Session Container extension header for 5G
# (TS 38.415 Section 5.5.3)
if use_qfi > 0:
header.e_flag = 1
# Extension header type 0x85 = PDU Session Container
# DL PDU SESSION INFORMATION (type=0, QFI)
ext = struct.pack("!BBBB",
1, # length in 4-byte units
(0 << 4) | (use_qfi & 0x3F),
0, # padding
0) # next extension type
header.extension_headers = [
bytes([0x85]) + ext]
header.length = len(payload)
if header.e_flag or header.s_flag or header.pn_flag:
header.length += 4 # optional header fields
for ext in header.extension_headers:
header.length += len(ext)
# Update counters
tunnel.tx_packets += 1
tunnel.tx_bytes += len(payload)
tunnel.tx_sequence = \
(tunnel.tx_sequence + 1) & 0xFFFF
return header.encode() + payload
def decapsulate(self, data: bytes) \
-> Optional[Tuple[TunnelEndpoint, bytes, int]]:
"""Decapsulate GTP-U packet.
Returns (tunnel, payload, qfi) or None if unknown TEID.
"""
header, offset = GTPUHeader.decode(data)
if header.message_type == GTPUMessageType.ECHO_REQUEST:
self._handle_echo_request(data)
return None
if header.message_type != GTPUMessageType.G_PDU:
return None
# Look up tunnel by local TEID
local_teid = header.teid
tunnel = self.tunnels.get(local_teid)
if not tunnel:
# Unknown TEID - send Error Indication
self._send_error_indication(
local_teid, data)
return None
# Extract QFI from PDU Session Container if present
qfi = 0
if header.e_flag and header.extension_headers:
for ext in header.extension_headers:
if len(ext) > 1 and ext[0] == 0x85:
qfi = ext[1] & 0x3F
payload = data[offset:]
# Update counters
tunnel.rx_packets += 1
tunnel.rx_bytes += len(payload)
return tunnel, payload, qfi
def apply_pfcp_rules(self, session: PFCPSession,
packet: bytes, direction: str) \
-> Optional[Tuple[str, dict]]:
"""Apply PFCP PDR/FAR rules to a packet.
Matches packet against PDRs, applies the associated
FAR action (forward, duplicate, buffer, drop).
"""
# Match against PDRs in priority order
matched_pdr = None
for pdr_id, pdr in sorted(
session.pdr_list.items(),
key=lambda x: x[1].get("precedence", 255)):
if self._packet_matches_pdr(
packet, pdr, direction):
matched_pdr = pdr
break
if not matched_pdr:
return None # No matching rule, drop
# Get associated FAR
far_id = matched_pdr.get("far_id")
if far_id is None or far_id not in session.far_list:
return None
far = session.far_list[far_id]
action = far.get("apply_action", "drop")
# Apply QER if present
qer_id = matched_pdr.get("qer_id")
if qer_id and qer_id in session.qer_list:
qer = session.qer_list[qer_id]
if not self._check_qos_enforcement(
packet, qer, direction):
return ("drop", {"reason": "QER_EXCEEDED"})
# Apply URR for usage reporting
urr_id = matched_pdr.get("urr_id")
if urr_id and urr_id in session.urr_list:
self._update_usage_report(
session.urr_list[urr_id],
len(packet), direction)
return (action, far)
def _packet_matches_pdr(self, packet: bytes,
pdr: dict,
direction: str) -> bool:
"""Check if packet matches PDR criteria."""
pdi = pdr.get("pdi", {})
# Source interface check
if pdi.get("source_interface") and \
pdi["source_interface"] != direction:
return False
# TEID match (for GTP-U encapsulated traffic)
if "local_f_teid" in pdi:
# Already matched by tunnel lookup
pass
# UE IP address match
if "ue_ip_address" in pdi and len(packet) >= 20:
version = (packet[0] >> 4) & 0x0F
if version == 4:
if direction == "uplink":
src_ip = socket.inet_ntoa(packet[12:16])
if src_ip != pdi["ue_ip_address"]:
return False
else:
dst_ip = socket.inet_ntoa(packet[16:20])
if dst_ip != pdi["ue_ip_address"]:
return False
# SDF filter (5-tuple matching)
if "sdf_filter" in pdi:
if not self._match_sdf_filter(
packet, pdi["sdf_filter"]):
return False
return True
def _check_qos_enforcement(self, packet: bytes,
qer: dict,
direction: str) -> bool:
"""Enforce QoS rate limits (token bucket)."""
key = "dl" if direction == "downlink" else "ul"
mbr = qer.get(f"maximum_bitrate_{key}", 0)
if mbr == 0:
return True # No limit
# Token bucket algorithm
bucket = qer.setdefault(f"_bucket_{key}", {
"tokens": mbr,
"last_update": time.monotonic(),
"rate": mbr, # bits per second
})
now = time.monotonic()
elapsed = now - bucket["last_update"]
bucket["tokens"] = min(
bucket["rate"],
bucket["tokens"] + elapsed * bucket["rate"])
bucket["last_update"] = now
packet_bits = len(packet) * 8
if bucket["tokens"] >= packet_bits:
bucket["tokens"] -= packet_bits
return True
return False
def _update_usage_report(self, urr: dict,
packet_size: int,
direction: str):
"""Update usage reporting counters."""
key = f"volume_{direction}"
urr[key] = urr.get(key, 0) + packet_size
urr[f"packets_{direction}"] = \
urr.get(f"packets_{direction}", 0) + 1
def _handle_echo_request(self, data): pass
def _send_error_indication(self, teid, data): pass
def _match_sdf_filter(self, packet, sdf): return True
The critical detail AI tools miss in GTP-U: TEID 0 is reserved and must never be used for user data tunnels (TS 29.281 Section 5.1), the PDU Session Container extension header (type 0x85) is mandatory for 5G to carry QFI, and the Error Indication message must be sent back when a G-PDU arrives with an unknown TEID rather than silently dropping it. Claude Code handles the GTP-U header encoding correctly because it can reason about the bit-level flag encoding (version, PT, E/S/PN in a single byte). Copilot generates reasonable struct.pack/unpack code for binary protocols but misses the extension header chaining mechanism. Cursor helps most when GTP-U handling, PFCP session management, and QoS enforcement code are in separate modules that reference each other.
3GPP Conformance Testing
Conformance testing in telecommunications is fundamentally different from unit testing in application development. Test cases are derived directly from 3GPP specification procedures, and each test verifies a specific protocol behavior with specific message sequences, timer values, and state transitions. ETSI defines conformance test suites in TTCN-3 (Testing and Test Control Notation), and even when implementing custom test frameworks, the test structure must mirror the 3GPP specification organization:
# 3GPP-style Conformance Test Framework
# Protocol conformance testing for NR RRC procedures
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Callable, Any
from enum import Enum, auto
import time
class TestVerdict(Enum):
PASS = auto()
FAIL = auto()
INCONCLUSIVE = auto()
ERROR = auto()
NONE = auto()
class MessageDirection(Enum):
UE_TO_NETWORK = auto() # Uplink
NETWORK_TO_UE = auto() # Downlink
@dataclass
class ProtocolMessage:
"""Represents a protocol message in a test sequence."""
direction: MessageDirection
message_type: str
ies: Dict[str, Any] = field(default_factory=dict)
timestamp: float = 0.0
@dataclass
class TestStep:
"""A single step in a conformance test case."""
step_number: int
description: str
action: Callable
expected_verdict: TestVerdict = TestVerdict.PASS
timeout_ms: int = 5000
@dataclass
class ConformanceTestCase:
"""3GPP-style conformance test case.
Structure mirrors ETSI TS 38.523 (5GS; UE conformance
specification; Part 1: Protocol).
"""
test_id: str # e.g., "8.1.1.1.1"
title: str
spec_reference: str # e.g., "TS 38.331 Section 5.3.3"
purpose: str
preconditions: List[str] = field(default_factory=list)
steps: List[TestStep] = field(default_factory=list)
verdict: TestVerdict = TestVerdict.NONE
messages_captured: List[ProtocolMessage] = field(
default_factory=list)
class NRRRCConformanceSuite:
"""Conformance test suite for NR RRC procedures.
Test cases derived from ETSI TS 38.523-1.
"""
def __init__(self, ue_under_test, network_simulator):
self.ue = ue_under_test
self.network = network_simulator
self.test_cases: List[ConformanceTestCase] = []
self._build_test_cases()
def _build_test_cases(self):
"""Build conformance test cases from spec."""
# TC 8.1.1.1.1: RRC Connection Setup - Normal
tc = ConformanceTestCase(
test_id="8.1.1.1.1",
title="RRC connection setup - success",
spec_reference="TS 38.331 Section 5.3.3",
purpose="Verify UE correctly performs RRC connection "
"setup procedure when network accepts the "
"request.",
preconditions=[
"UE is in RRC_IDLE state",
"UE is camped on a suitable NR cell",
"NAS has triggered connection establishment",
],
)
tc.steps = [
TestStep(1,
"UE sends RRCSetupRequest on SRB0 (CCCH)",
lambda: self._verify_rrc_setup_request()),
TestStep(2,
"Network sends RRCSetup",
lambda: self._send_rrc_setup()),
TestStep(3,
"Verify UE transitions to RRC_CONNECTED",
lambda: self._verify_ue_state("RRC_CONNECTED")),
TestStep(4,
"Verify UE sends RRCSetupComplete on SRB1",
lambda: self._verify_rrc_setup_complete()),
TestStep(5,
"Verify T300 is stopped",
lambda: self._verify_timer_stopped("T300")),
TestStep(6,
"Verify SRB1 is established",
lambda: self._verify_srb_established(1)),
]
self.test_cases.append(tc)
# TC 8.1.1.2.1: RRC Setup - T300 Expiry
tc_t300 = ConformanceTestCase(
test_id="8.1.1.2.1",
title="RRC connection setup - T300 expiry and retry",
spec_reference="TS 38.331 Section 5.3.3.8",
purpose="Verify UE retransmits RRCSetupRequest on "
"T300 expiry up to max retransmissions, then "
"informs NAS of failure.",
preconditions=[
"UE is in RRC_IDLE state",
"UE is camped on a suitable NR cell",
"Network is configured to NOT respond to "
"RRCSetupRequest",
],
)
tc_t300.steps = [
TestStep(1,
"UE sends RRCSetupRequest on SRB0",
lambda: self._verify_rrc_setup_request()),
TestStep(2,
"Wait for T300 expiry (network does not respond)",
lambda: self._wait_for_timer_expiry("T300"),
timeout_ms=2000),
TestStep(3,
"Verify UE retransmits RRCSetupRequest",
lambda: self._verify_retransmission(
"RRCSetupRequest"),
timeout_ms=2000),
TestStep(4,
"Repeat until max retransmissions (4x)",
lambda: self._verify_max_retransmissions(
"RRCSetupRequest", 4),
timeout_ms=20000),
TestStep(5,
"Verify UE informs NAS of setup failure",
lambda: self._verify_nas_notification(
"RRC_SETUP_FAILURE")),
TestStep(6,
"Verify UE remains in RRC_IDLE",
lambda: self._verify_ue_state("RRC_IDLE")),
]
self.test_cases.append(tc_t300)
# TC 8.1.3.1.1: RRC Reconfiguration - Handover
tc_ho = ConformanceTestCase(
test_id="8.1.3.1.1",
title="RRC reconfiguration - intra-NR handover",
spec_reference="TS 38.331 Section 5.3.5.4",
purpose="Verify UE correctly executes handover via "
"RRCReconfiguration with mobilityControlInfo.",
preconditions=[
"UE is in RRC_CONNECTED state",
"Active DRB established",
"Measurement reporting configured",
],
)
tc_ho.steps = [
TestStep(1,
"Configure measurement (Event A3, offset 3dB)",
lambda: self._configure_measurement_a3(
offset_db=3.0)),
TestStep(2,
"Simulate neighbour cell becoming stronger",
lambda: self._set_cell_rsrp(
target_pci=2, rsrp=-80,
serving_rsrp=-90)),
TestStep(3,
"Verify UE sends MeasurementReport with A3 event",
lambda: self._verify_measurement_report(
event="A3", pci=2),
timeout_ms=5000),
TestStep(4,
"Send RRCReconfiguration with mobilityControlInfo",
lambda: self._send_handover_command(
target_pci=2)),
TestStep(5,
"Verify T304 is started",
lambda: self._verify_timer_running("T304")),
TestStep(6,
"Verify UE performs RACH on target cell",
lambda: self._verify_rach_on_target(pci=2),
timeout_ms=1000),
TestStep(7,
"Verify UE sends RRCReconfigurationComplete",
lambda: self._verify_message_sent(
"RRCReconfigurationComplete")),
TestStep(8,
"Verify T304 is stopped",
lambda: self._verify_timer_stopped("T304")),
TestStep(9,
"Verify data plane continuity on new cell",
lambda: self._verify_data_continuity()),
]
self.test_cases.append(tc_ho)
def run_test_case(self, test_id: str) -> TestVerdict:
"""Execute a single conformance test case."""
tc = next((t for t in self.test_cases
if t.test_id == test_id), None)
if not tc:
raise ValueError(f"Test case {test_id} not found")
# Verify preconditions
for precond in tc.preconditions:
if not self._check_precondition(precond):
tc.verdict = TestVerdict.INCONCLUSIVE
return tc.verdict
# Execute steps
for step in tc.steps:
try:
start = time.monotonic()
result = step.action()
elapsed_ms = (time.monotonic() - start) * 1000
if elapsed_ms > step.timeout_ms:
tc.verdict = TestVerdict.FAIL
self._log_failure(tc, step,
f"Step timed out ({elapsed_ms:.0f}ms "
f"> {step.timeout_ms}ms)")
return tc.verdict
if result is False:
tc.verdict = TestVerdict.FAIL
self._log_failure(tc, step,
"Step verification failed")
return tc.verdict
except Exception as e:
tc.verdict = TestVerdict.ERROR
self._log_failure(tc, step, str(e))
return tc.verdict
tc.verdict = TestVerdict.PASS
return tc.verdict
def run_suite(self) -> Dict[str, TestVerdict]:
"""Run all test cases and return results."""
results = {}
for tc in self.test_cases:
results[tc.test_id] = self.run_test_case(tc.test_id)
return results
# Verification methods (implementation depends on test harness)
def _verify_rrc_setup_request(self): return True
def _send_rrc_setup(self): return True
def _verify_ue_state(self, state): return True
def _verify_rrc_setup_complete(self): return True
def _verify_timer_stopped(self, name): return True
def _verify_timer_running(self, name): return True
def _verify_srb_established(self, srb_id): return True
def _wait_for_timer_expiry(self, name): return True
def _verify_retransmission(self, msg_type): return True
def _verify_max_retransmissions(self, msg, count): return True
def _verify_nas_notification(self, event): return True
def _configure_measurement_a3(self, offset_db): return True
def _set_cell_rsrp(self, **kwargs): return True
def _verify_measurement_report(self, **kwargs): return True
def _send_handover_command(self, target_pci): return True
def _verify_rach_on_target(self, pci): return True
def _verify_message_sent(self, msg_type): return True
def _verify_data_continuity(self): return True
def _check_precondition(self, precond): return True
def _log_failure(self, tc, step, msg): pass
Conformance test structure is fundamentally different from unit testing: each test case maps to a specific 3GPP specification section, preconditions establish the protocol state before the test begins, and steps follow the exact message exchange sequence defined in the specification. AI tools that generate generic pytest-style test cases miss the protocol-level structure entirely — there is no “mock” for a radio link failure, and asserting on function return values does not verify that the correct RRC message was sent on the correct SRB with the correct ASN.1-encoded IEs. Claude Code understands the conformance test structure because it can reference the procedural descriptions in 3GPP specifications. Cursor helps when test cases, protocol handlers, and message encoders are in the same project, allowing cross-file verification.
Network Telemetry Collection
Telecom network telemetry requires domain-specific KPI computation from raw performance counters, alarm correlation across network layers, and YANG model-driven data collection. A telemetry system that does not understand the relationship between radio-level counters (RRC setup attempts/completions) and the KPIs operators track (CSSR, CDR) provides raw data without operational insight:
# Telecom Network Telemetry: KPI Engine + Alarm Correlator
# YANG-driven collection, 3GPP KPI computation, X.733 alarms
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Set
from enum import Enum, auto
from datetime import datetime, timedelta
import re
import math
class AlarmSeverity(Enum):
CRITICAL = 1
MAJOR = 2
MINOR = 3
WARNING = 4
INDETERMINATE = 5
CLEARED = 6
class AlarmEventType(Enum):
COMMUNICATIONS = auto()
PROCESSING = auto()
ENVIRONMENT = auto()
QUALITY_OF_SERVICE = auto()
EQUIPMENT = auto()
@dataclass
class X733Alarm:
"""ITU-T X.733 structured alarm."""
alarm_id: str
managed_object_class: str # e.g., "NRCellDU", "GnbDuFunction"
managed_object_instance: str # DN path
event_type: AlarmEventType
probable_cause: str # e.g., "threshold-crossed"
perceived_severity: AlarmSeverity
specific_problem: str
additional_text: str
event_time: datetime
notification_id: int = 0
correlated_notifications: List[int] = field(
default_factory=list)
root_cause_alarm_id: Optional[str] = None
@dataclass
class PerformanceCounter:
"""Raw performance counter from a network element."""
counter_name: str # 3GPP-defined counter name
counter_value: float
collection_time: datetime
granularity_period_s: int = 900 # Typically 15 min
managed_element: str = ""
cell_id: str = ""
class TelecomKPIEngine:
"""Computes telecom KPIs from raw 3GPP performance counters.
KPI definitions from 3GPP TS 28.552 (5G NR Performance
Measurements) and TS 32.450 (KPI definitions).
"""
def __init__(self):
self.counter_store: Dict[str, List[PerformanceCounter]] = {}
self.kpi_history: Dict[str, List[Tuple[datetime, float]]] = {}
def ingest_counters(self, counters: List[PerformanceCounter]):
"""Ingest raw performance counters."""
for counter in counters:
key = (f"{counter.managed_element}:"
f"{counter.cell_id}:"
f"{counter.counter_name}")
if key not in self.counter_store:
self.counter_store[key] = []
self.counter_store[key].append(counter)
def compute_cssr(self, cell_id: str,
period_start: datetime,
period_end: datetime) -> Optional[float]:
"""Call Setup Success Rate (CSSR).
CSSR = (RRC.ConnEstabSucc / RRC.ConnEstabAtt) * 100
Counters (TS 28.552):
- RRC.ConnEstabAtt: Total RRC connection setup attempts
- RRC.ConnEstabSucc: Successful RRC setups (per cause)
Target: >= 99.0%
"""
attempts = self._sum_counter(
cell_id, "RRC.ConnEstabAtt",
period_start, period_end)
successes = self._sum_counter(
cell_id, "RRC.ConnEstabSucc",
period_start, period_end)
if attempts == 0:
return None # No data
cssr = (successes / attempts) * 100.0
self._store_kpi(f"{cell_id}:CSSR",
period_end, cssr)
return cssr
def compute_cdr(self, cell_id: str,
period_start: datetime,
period_end: datetime) -> Optional[float]:
"""Call Drop Rate (CDR).
CDR = (RRC.ConnRelAbnormal / RRC.ConnMean) * 100
Counters:
- RRC.ConnRelAbnormal: Abnormal RRC releases
(radio link failure, handover failure, etc.)
- RRC.ConnMean: Mean number of active RRC connections
during the measurement period
Target: <= 1.0%
"""
abnormal_releases = self._sum_counter(
cell_id, "RRC.ConnRel.Abnormal",
period_start, period_end)
mean_connections = self._avg_counter(
cell_id, "RRC.ConnMean",
period_start, period_end)
if mean_connections == 0:
return None
cdr = (abnormal_releases / mean_connections) * 100.0
self._store_kpi(f"{cell_id}:CDR", period_end, cdr)
return cdr
def compute_hosr(self, cell_id: str,
period_start: datetime,
period_end: datetime) -> Optional[float]:
"""Handover Success Rate (HOSR).
HOSR = (HO.ExeSucc / HO.ExeAtt) * 100
Counters (TS 28.552):
- HO.ExeAtt: Handover execution attempts
(intra-freq + inter-freq + inter-RAT)
- HO.ExeSucc: Successful handover executions
Target: >= 98.0%
"""
ho_att = self._sum_counter(
cell_id, "HO.ExeAtt",
period_start, period_end)
ho_succ = self._sum_counter(
cell_id, "HO.ExeSucc",
period_start, period_end)
if ho_att == 0:
return None
hosr = (ho_succ / ho_att) * 100.0
self._store_kpi(f"{cell_id}:HOSR", period_end, hosr)
return hosr
def compute_dl_throughput(self, cell_id: str,
period_start: datetime,
period_end: datetime) \
-> Optional[float]:
"""Average downlink cell throughput (Mbps).
DL Throughput = (DRB.PdcpSduBitrateDl.Mean)
Or derived from: DRB.PdcpSduVolumeDL / period_seconds
"""
volume_bits = self._sum_counter(
cell_id, "DRB.PdcpSduVolumeDL",
period_start, period_end)
period_s = (period_end - period_start).total_seconds()
if period_s == 0:
return None
throughput_mbps = (volume_bits * 8) / (period_s * 1e6)
self._store_kpi(f"{cell_id}:DL_THROUGHPUT",
period_end, throughput_mbps)
return throughput_mbps
def compute_rrc_setup_time(self, cell_id: str,
period_start: datetime,
period_end: datetime) \
-> Optional[float]:
"""Average RRC Connection Setup Time (ms).
Derived from:
RRC.ConnEstabTimeMean or
(RRC.ConnEstabTimeSum / RRC.ConnEstabSucc)
"""
time_sum = self._sum_counter(
cell_id, "RRC.ConnEstabTimeSum",
period_start, period_end)
successes = self._sum_counter(
cell_id, "RRC.ConnEstabSucc",
period_start, period_end)
if successes == 0:
return None
avg_ms = time_sum / successes
self._store_kpi(f"{cell_id}:RRC_SETUP_TIME",
period_end, avg_ms)
return avg_ms
def _sum_counter(self, cell_id, counter_name,
start, end) -> float:
total = 0.0
for key, counters in self.counter_store.items():
if cell_id in key and counter_name in key:
for c in counters:
if start <= c.collection_time <= end:
total += c.counter_value
return total
def _avg_counter(self, cell_id, counter_name,
start, end) -> float:
values = []
for key, counters in self.counter_store.items():
if cell_id in key and counter_name in key:
for c in counters:
if start <= c.collection_time <= end:
values.append(c.counter_value)
return sum(values) / len(values) if values else 0.0
def _store_kpi(self, kpi_key, timestamp, value):
if kpi_key not in self.kpi_history:
self.kpi_history[kpi_key] = []
self.kpi_history[kpi_key].append((timestamp, value))
class AlarmCorrelator:
"""Correlates alarms across network layers.
Identifies root causes by analyzing temporal and topological
relationships between alarms from different network elements.
"""
def __init__(self):
self.active_alarms: Dict[str, X733Alarm] = {}
self.correlation_rules: List[dict] = []
self._init_rules()
def _init_rules(self):
"""Define alarm correlation rules.
Rules encode domain knowledge about cause-effect
relationships in telecom networks.
"""
self.correlation_rules = [
{
"name": "fiber_cut_radio_impact",
"root_cause": {
"managed_object_class": "TransportLink",
"probable_cause": "loss-of-signal",
},
"symptoms": [
{"managed_object_class": "NRCellDU",
"probable_cause": "communication-failure"},
{"managed_object_class": "GnbDuFunction",
"probable_cause": "back-haul-failure"},
],
"time_window_s": 60,
"description": "Transport link failure causes "
"cell outages on connected gNBs",
},
{
"name": "power_failure_cascade",
"root_cause": {
"managed_object_class": "PowerSupply",
"event_type": AlarmEventType.ENVIRONMENT,
},
"symptoms": [
{"managed_object_class": "GnbDuFunction",
"probable_cause": "equipment-malfunction"},
{"managed_object_class": "NRCellDU",
"probable_cause": "communication-failure"},
],
"time_window_s": 30,
"description": "Power supply failure causes "
"full site outage",
},
{
"name": "core_nf_overload",
"root_cause": {
"managed_object_class": "AMFFunction",
"probable_cause": "threshold-crossed",
"specific_problem_pattern": ".*cpu.*|.*memory.*",
},
"symptoms": [
{"managed_object_class": "NRCellDU",
"probable_cause": "threshold-crossed",
"specific_problem_pattern": ".*CSSR.*"},
],
"time_window_s": 120,
"description": "Core NF overload causes cell-level "
"KPI degradation across many cells",
},
]
def process_alarm(self, alarm: X733Alarm) \
-> Optional[str]:
"""Process incoming alarm and attempt correlation.
Returns root_cause_alarm_id if correlated, None otherwise.
"""
if alarm.perceived_severity == AlarmSeverity.CLEARED:
self._clear_alarm(alarm)
return None
self.active_alarms[alarm.alarm_id] = alarm
# Try to correlate with existing alarms
for rule in self.correlation_rules:
root_id = self._try_correlate(alarm, rule)
if root_id:
alarm.root_cause_alarm_id = root_id
return root_id
return None
def _try_correlate(self, new_alarm: X733Alarm,
rule: dict) -> Optional[str]:
"""Try to correlate new alarm using a rule."""
root_spec = rule["root_cause"]
symptom_specs = rule["symptoms"]
window = timedelta(seconds=rule["time_window_s"])
# Check if new alarm matches a symptom
is_symptom = False
for spec in symptom_specs:
if self._alarm_matches_spec(new_alarm, spec):
is_symptom = True
break
if not is_symptom:
# Check if new alarm IS the root cause
if self._alarm_matches_spec(new_alarm, root_spec):
# Look for existing symptom alarms to correlate
for aid, existing in self.active_alarms.items():
if aid == new_alarm.alarm_id:
continue
time_diff = abs(
(existing.event_time -
new_alarm.event_time).total_seconds())
if time_diff <= rule["time_window_s"]:
for spec in symptom_specs:
if self._alarm_matches_spec(
existing, spec):
existing.root_cause_alarm_id = \
new_alarm.alarm_id
existing.correlated_notifications\
.append(
new_alarm.notification_id)
return None
# New alarm is a symptom - find matching root cause
for aid, existing in self.active_alarms.items():
if aid == new_alarm.alarm_id:
continue
time_diff = abs(
(existing.event_time -
new_alarm.event_time).total_seconds())
if time_diff > rule["time_window_s"]:
continue
if self._alarm_matches_spec(existing, root_spec):
new_alarm.correlated_notifications.append(
existing.notification_id)
return existing.alarm_id
return None
def _alarm_matches_spec(self, alarm: X733Alarm,
spec: dict) -> bool:
"""Check if alarm matches correlation rule spec."""
if "managed_object_class" in spec:
if alarm.managed_object_class != \
spec["managed_object_class"]:
return False
if "probable_cause" in spec:
if alarm.probable_cause != spec["probable_cause"]:
return False
if "event_type" in spec:
if alarm.event_type != spec["event_type"]:
return False
if "specific_problem_pattern" in spec:
pattern = spec["specific_problem_pattern"]
if not re.search(pattern,
alarm.specific_problem,
re.IGNORECASE):
return False
return True
def _clear_alarm(self, alarm: X733Alarm):
"""Process alarm clear."""
# Find and remove the active alarm
# Match by managed object instance + probable cause
to_remove = []
for aid, active in self.active_alarms.items():
if (active.managed_object_instance ==
alarm.managed_object_instance and
active.probable_cause ==
alarm.probable_cause):
to_remove.append(aid)
for aid in to_remove:
del self.active_alarms[aid]
def get_root_cause_summary(self) \
-> List[Dict[str, any]]:
"""Get summary of active root cause alarms
and their correlated symptoms."""
roots = {}
for aid, alarm in self.active_alarms.items():
if alarm.root_cause_alarm_id is None:
# This might be a root cause
roots[aid] = {
"alarm": alarm,
"symptoms": [],
}
for aid, alarm in self.active_alarms.items():
if alarm.root_cause_alarm_id and \
alarm.root_cause_alarm_id in roots:
roots[alarm.root_cause_alarm_id]["symptoms"]\
.append(alarm)
return [
{"root_cause": info["alarm"],
"symptom_count": len(info["symptoms"]),
"symptoms": info["symptoms"]}
for info in roots.values()
if len(info["symptoms"]) > 0
]
The key domain insight that AI tools miss: telecom KPIs are computed from specific 3GPP-defined counter names (TS 28.552), not from arbitrary application metrics. CSSR uses “RRC.ConnEstabAtt” and “RRC.ConnEstabSucc” — not custom counters you invent. Alarm correlation must be topology-aware: a fiber cut at one transport link can cause hundreds of cell-level alarms, and the operator needs to see one root cause, not 200 individual tickets. Claude Code excels at understanding the KPI formulas and their counter dependencies because it can reference the 3GPP performance measurement specifications. Cursor helps when YANG models, counter collection code, and KPI computation are in separate files. Copilot generates reasonable data aggregation code but uses generic metric names rather than the 3GPP-standardized counter nomenclature.
After testing all major tools against telecom-specific tasks, these are the most common and most dangerous errors:
- Ignoring ASN.1 encoding constraints. AI tools generate JSON or protobuf serialization for RRC messages that must be ASN.1 PER-encoded. PER (Packed Encoding Rules) is a bit-level encoding where field positions are determined by the ASN.1 schema, not by field names — and a single bit offset error makes the entire message undecodable. If your spec says ASN.1, the code must use ASN.1.
- Missing timer management in protocol state machines. Every telecom protocol procedure is guarded by timers, and timer expiry is not an error — it is a defined state transition that must be handled explicitly. AI tools generate state machines that handle messages but ignore timers entirely, producing implementations that hang indefinitely on message loss.
- Generating stateless SIP handling. SIP is inherently stateful: dialogs have state (early, confirmed, terminated), transactions have state (calling, proceeding, completed), and subscriptions have state. AI tools trained primarily on HTTP code generate stateless request handlers that cannot maintain call state across re-INVITEs or handle forked responses.
- Not understanding 3GPP IE optionality. Information Elements in 3GPP messages have specific optionality rules: mandatory, conditional (present only when a condition is true), and optional. AI tools often include all IEs unconditionally or omit conditional IEs whose presence is required by the current protocol state, producing messages that fail conformance testing.
- Using HTTP/REST for signaling that requires SCTP. NG-AP (between gNB and AMF) runs over SCTP, not TCP or HTTP. SCTP provides message-oriented delivery with multi-homing and multi-streaming that are critical for signaling transport. AI tools default to HTTP/REST or TCP sockets, missing the SCTP-specific features (stream-based ordering, path failover) that the protocol depends on.
- Ignoring HARQ process management in MAC layer code. The MAC layer manages up to 16 HARQ processes in NR, each independently tracking transport block transmissions and retransmissions. AI tools generate MAC code that handles one transmission at a time, missing the concurrent HARQ process management that is fundamental to NR throughput.
- Missing GTP-U sequence number handling. GTP-U G-PDU messages carry optional sequence numbers used for reordering at handover boundaries. AI tools either ignore sequence numbers entirely or implement them incorrectly, causing packet reordering during handover that breaks TCP throughput.
- Generating blocking I/O in real-time signaling paths. Signaling processing has sub-millisecond latency requirements. AI tools generate synchronous database queries, file I/O, or DNS lookups in the signaling hot path, introducing latency spikes that cause timer expiries and call failures under load.
- Not handling protocol version negotiation. Telecom protocols evolve across 3GPP releases (Release 15, 16, 17, 18), and a UE must negotiate capabilities with the network. AI tools generate code that assumes a single protocol version, producing implementations that fail when connected to networks running different releases.
- $0/mo — Solo telecom researcher / hobbyist: GitHub Copilot Free (2,000 completions/month covers protocol experimentation) + Gemini CLI Free (unlimited during preview for reasoning through spec sections). Enough for learning 3GPP specs and building protocol prototypes.
- $10/mo — Protocol developer: Copilot Pro for faster inline completions during repetitive protocol message structure coding. Good for grinding through hundreds of IE definitions and message encoding functions.
- $20/mo — Senior telecom engineer (pick one): Claude Code ($20/mo) for spec interpretation, protocol state machine reasoning, timer interaction analysis, and conformance test design. OR Cursor Pro ($20/mo) for multi-file protocol stack development where RRC/PDCP/RLC/MAC layers reference each other. Choose Claude if your work is spec-heavy (interpreting 3GPP procedures, verifying state machine correctness). Choose Cursor if your work is implementation-heavy (writing and refactoring stack code).
- $30/mo — Lead telecom architect: Claude Code ($20/mo) + Copilot Pro ($10/mo). Claude for spec reasoning, architecture decisions, protocol correctness verification. Copilot for fast inline completions during routine coding. Best combination for someone who both designs and implements protocol stacks.
- $40-99/mo — Telecom vendor team seat: Cursor Business ($40/seat) for teams with shared protocol stack codebases — the multi-file indexing across shared projects is critical. Or enterprise tiers with on-premise deployment for vendors handling classified telecom infrastructure. Add Claude Code team tier for protocol reasoning alongside IDE completions.
Before committing any AI-generated code to a telecom protocol implementation, verify:
- Every protocol state has explicit timer management. Every state that waits for a message must have a guard timer, and every timer expiry must trigger a defined state transition — not an exception, not a TODO, but a transition to a specific state with specific actions.
- ASN.1 encoding matches the spec. If the 3GPP spec defines a message using ASN.1, verify that the encoding uses the correct PER variant (aligned or unaligned), that optional IEs use the correct presence bitmap encoding, and that ENUMERATED and CHOICE types use the correct index encoding.
- SIP dialog state is maintained correctly. Verify that To-tags are captured from responses, that Record-Route headers are reversed for route sets, that CSeq numbers increment per-dialog, and that re-INVITE is handled as a mid-dialog operation, not a new dialog.
- GTP-U TEID 0 is never used for user data. TEID 0 is reserved for GTP-U path management (echo request/response). Verify that tunnel allocation starts at TEID 1.
- SCTP multi-homing is configured for signaling. If the code handles NG-AP, S1-AP, or Diameter signaling, verify it uses SCTP with at least two association paths and handles SCTP COMM_LOST/RESTART notifications.
- No blocking I/O in the signaling path. Every operation in the signaling hot path must be non-blocking. Database lookups, DNS resolution, logging to disk, and external API calls must be asynchronous or offloaded to worker threads.
- KPI formulas use 3GPP counter names. CSSR uses RRC.ConnEstabAtt and RRC.ConnEstabSucc, not custom metrics. CDR uses RRC.ConnRel.Abnormal, not application error rates. Verify counter names match TS 28.552.
- Handover handling preserves data plane continuity. During intra-NR handover, PDCP PDUs must be forwarded from source to target gNB, and the UE must re-establish the RLC layer on the target cell without losing in-transit user data.
- Error Indication is sent for unknown TEIDs. When a GTP-U G-PDU arrives with a TEID that does not match any active tunnel, the correct response is an Error Indication message — not silent discard.
- Protocol version compatibility is handled. Verify that the code includes capability negotiation or version checking for peers running different 3GPP releases, and that mandatory IEs for the negotiated version are always present.
Related Guides
- AI Coding Tools for Networking Engineers (2026) — Routing protocols, network automation, BGP, MPLS, SDN, load balancing
- AI Coding Tools for Embedded/IoT Engineers (2026) — Firmware, RTOS, bare-metal, sensor integration, power management
- AI Coding Tools for Performance Engineers (2026) — Profiling, benchmarking, latency optimization, throughput tuning
- AI Coding Tools for Backend Engineers (2026) — API design, distributed systems, database patterns
- AI Coding Tools for Systems Programmers (2026) — OS internals, memory management, concurrency, low-level optimization
- AI Coding Tools for DevOps Engineers (2026) — CI/CD, infrastructure-as-code, monitoring, deployment
Compare all the tools and pricing on our main comparison table, check the cheapest tools guide for budget options, or see the enterprise guide for organizational procurement and data governance considerations.