CodeCosts

AI Coding Tool News & Analysis

AI Coding Tools for Telecommunications Engineers 2026: 5G NR Protocol Stacks, SIP/VoIP, Network Function Virtualization, Real-Time Signaling, 3GPP Conformance & Network Telemetry Guide

Telecommunications engineering is the discipline where a single mishandled timer expiry in an RRC state machine can drop ten thousand active calls, where a SIP parser that misreads a Via header routes voice traffic into a black hole, and where the protocol specifications you implement against — the 3GPP TS 38.xxx series alone runs to tens of thousands of pages — are updated every quarterly release cycle. The global telecommunications industry generates over $1.7 trillion in annual revenue, and the engineers who build, maintain, and evolve this infrastructure work at the intersection of real-time systems programming, formal protocol specification, distributed systems architecture, and regulatory compliance that spans every jurisdiction on earth. You are not building a web application that can tolerate a 500ms hiccup — you are building the infrastructure that web applications run on, where a 1ms latency violation in a URLLC bearer breaks the contract with an autonomous vehicle relying on that link, where a missed HARQ retransmission in the MAC layer cascades into an RLC reassembly failure that corrupts a PDCP SDU carrying a VoNR voice frame, and where “five nines” availability (99.999%) means you are allowed 5.26 minutes of downtime per year, not per month.

This guide evaluates every major AI coding tool through the lens of what telecommunications engineers actually build. We tested each tool against seven core task areas: 5G NR protocol stack implementation (RRC state machines, MAC scheduling, RLC segmentation, PDCP ciphering, SDAP QoS mapping, ASN.1 PER encoding), SIP/VoIP and IMS integration (SIP message parsing, SDP offer/answer, dialog state machines, Diameter interfaces, RTP/RTCP media handling, VoLTE/VoNR), NFV/CNF orchestration (MANO lifecycle management, cloud-native network functions on Kubernetes, 5G Service Based Architecture, ETSI NFV standards), protocol state machine design (formal correctness, timer management, race condition handling, guard conditions), real-time signaling systems (HARQ timing, GTP-U tunnel management, SCTP multi-homing, PFCP session control, URLLC latency budgets), 3GPP conformance and testing (TTCN-3 test suites, conformance test cases, protocol trace analysis), and network telemetry and observability (SNMP, NETCONF/YANG, gNMI streaming, KPI computation, alarm correlation). Every code example is production-realistic — real 3GPP information element names, real timer identifiers, real protocol message structures.

If your work focuses more on general network infrastructure and routing, see our Networking Engineers guide. If you build firmware for radio hardware or IoT devices on the network edge, see the Embedded/IoT Engineers guide. If your primary concern is latency profiling and throughput optimization at the system level, see the Performance Engineers guide.

TL;DR

Best free ($0): GitHub Copilot Free — decent protocol scaffolding and boilerplate for message structures, 2,000 completions/mo covers light protocol development. Best overall ($20/mo): Cursor Pro — multi-file context handles protocol stack layers, state machines, and configuration together across your project tree. Best for reasoning ($20/mo): Claude Code — strongest at 3GPP specification interpretation, protocol state machine correctness verification, and signaling flow analysis across layers. Best combo ($30/mo): Claude Code + Copilot Pro — Claude for spec reasoning and protocol correctness, Copilot for fast inline completions during routine coding. Budget ($0): Copilot Free + Gemini CLI Free.

Why Telecommunications Engineering Is Different

Telecommunications engineers evaluate AI tools on a fundamentally different axis than application developers. A web developer asks “does this tool write good React?” A telecom engineer asks “does this tool understand that the T310 timer expiry in RRC_CONNECTED triggers a transition to RRC_IDLE only after T311 has also expired without a suitable cell being found, and that getting this sequence wrong means the UE drops an active VoNR call instead of performing cell reselection?” The evaluation criteria are unique to this domain:

  • 5G NR protocol stacks are layered complexity that must interlock precisely. The 3GPP TS 38.xxx specification series defines the New Radio access technology across multiple protocol layers, each with its own state machines, timers, and data processing requirements. The RRC (Radio Resource Control) layer, specified in TS 38.331, manages the connection between UE and gNB through three primary states: RRC_IDLE (no active connection, cell reselection based on SIB information), RRC_INACTIVE (connection context preserved at gNB, UE performs RNA-based mobility), and RRC_CONNECTED (active data transfer, handover managed by gNB). Each state transition involves specific message exchanges — RRCSetupRequest, RRCSetup, RRCSetupComplete for initial access; RRCReconfiguration for bearer modification, handover, and measurement configuration; RRCRelease for connection teardown with optional suspend indication for transition to RRC_INACTIVE. Every message is encoded using ASN.1 PER (Packed Encoding Rules) as specified in TS 38.331 Annex A, where a single bit-offset error in encoding an RRC Information Element produces a message the peer entity cannot decode, triggering an integrity check failure and connection release. Below RRC, the PDCP layer (TS 38.323) handles header compression using ROHC profiles (RFC 5795), integrity protection using 128-bit keys with NIA algorithms, ciphering using NEA algorithms, reordering of out-of-sequence SDUs using a reordering timer (t-Reordering), and duplicate detection using a sliding receive window. The RLC layer (TS 38.322) operates in three modes: Transparent Mode (TM) for broadcast, Unacknowledged Mode (UM) for voice and real-time data where retransmission is worse than loss, and Acknowledged Mode (AM) where ARQ retransmissions with configurable poll timers (t-PollRetransmit) and status triggers ensure reliable delivery. The MAC layer (TS 38.321) handles scheduling through DCI (Downlink Control Information) formats, HARQ process management with up to 16 processes in NR, BSR (Buffer Status Reports) for uplink scheduling requests, PHR (Power Headroom Reports), bandwidth part (BWP) switching, and logical channel prioritization. A change in MAC scheduling parameters affects RLC segmentation sizes, which affects PDCP reordering behavior, which affects RRC measurement reporting timing — and an AI tool that generates code for one layer without understanding its impact on adjacent layers produces a protocol stack that fails under load.
  • SIP/VoIP and IMS are stateful protocol nightmares. The Session Initiation Protocol (RFC 3261) is a text-based signaling protocol that appears simple until you encounter the real-world complexity of a production IMS (IP Multimedia Subsystem) deployment. A basic call setup requires an INVITE request flowing through P-CSCF (Proxy-Call Session Control Function), I-CSCF (Interrogating-CSCF for HSS lookup), and S-CSCF (Serving-CSCF for service execution), each adding Via headers, Record-Route headers, and P-headers for charging and access network information. The SDP (Session Description Protocol, RFC 4566) body in the INVITE carries the offer with media descriptions — codec capabilities (AMR-WB for VoLTE, EVS for VoNR, with specific mode-set parameters), RTP port numbers, ICE candidates for NAT traversal, DTLS-SRTP fingerprints for media encryption, and bandwidth modifiers. The answer in the 200 OK must intersect the offered codecs, select compatible parameters, and include its own transport addresses. Dialog state management tracks early dialogs (after provisional 1xx responses with To-tag), confirmed dialogs (after 2xx), and terminated dialogs (after BYE or error responses) — and a single dialog can fork into multiple early dialogs when an INVITE reaches multiple endpoints through a forking proxy. Transaction state machines (RFC 3261 Section 17) manage retransmissions for unreliable transports (UDP), with Timer A for INVITE retransmit intervals, Timer B for INVITE transaction timeout (64*T1 = 32 seconds), Timer D for wait time after non-INVITE response (32 seconds for UDP), and Timer F for non-INVITE transaction timeout. The Diameter protocol interfaces in IMS add another layer: Cx (HSS to I-CSCF/S-CSCF for registration and routing), Rx (P-CSCF to PCRF for policy and charging rules), Gx (PCRF to PCEF for policy enforcement), Gy (OCS for online charging), and Sh (AS to HSS for subscriber data). Each Diameter interface has its own AVP (Attribute-Value Pair) dictionary, command codes, and state machines. An AI tool that generates “SIP handling code” using stateless request-response patterns like HTTP produces a VoIP system that drops calls on network packet loss, fails on forked INVITE responses, and cannot maintain dialog state across re-INVITE for hold/resume or codec renegotiation.
  • Network Function Virtualization transforms telecom infrastructure. The telecom industry’s migration from purpose-built hardware appliances to virtualized and cloud-native network functions represents the largest infrastructure transformation in its history. ETSI NFV defines the MANO (Management and Orchestration) architecture: NFVO (NFV Orchestrator) manages network service lifecycle and resource orchestration across VIMs, VNFM (VNF Manager) handles individual VNF lifecycle operations (instantiation, scaling, healing, termination), and VIM (Virtualized Infrastructure Manager, typically OpenStack or Kubernetes) manages compute, storage, and networking resources. The 5G Core (5GC) takes this further with the Service Based Architecture (SBA) defined in TS 23.501, where network functions communicate via HTTP/2-based service interfaces: AMF (Access and Mobility Management Function) handles registration, connection, and mobility management; SMF (Session Management Function) manages PDU sessions and UPF selection; UPF (User Plane Function) processes and forwards user data packets; PCF (Policy Control Function) provides policy rules; UDM (Unified Data Management) stores subscriber data; AUSF (Authentication Server Function) handles authentication; NRF (Network Repository Function) provides service discovery; and NSSF (Network Slice Selection Function) selects network slices. Each NF must support service registration with NRF, OAuth2-based authorization for inter-NF communication (TS 33.501), graceful scaling (draining existing sessions before removing instances), heartbeat-based health monitoring, and the N32 interface for inter-PLMN security. Cloud-native deployments add Kubernetes-specific concerns: StatefulSets for NFs with persistent state (SMF session state, UDM subscriber data), custom operators for lifecycle management, Multus for multiple CNI attachments (separate interfaces for signaling, user plane, and management), and DPDK/SR-IOV for user plane data path performance. An AI tool that treats network function deployment like a standard web microservice — stateless, horizontally scalable, restart-anywhere — produces deployments that lose signaling state on pod restart, break mid-call during scaling events, and cannot meet the 99.999% availability SLA that telecom operators contractually guarantee.
  • Protocol state machines must be formally correct. Telecommunications protocols are defined as state machines, and the correctness of those state machines is not a quality-of-service concern — it is a functional requirement. The UE registration procedure (TS 24.501 Section 5.5.1) involves states from DEREGISTERED through REGISTRATION-INITIATED to REGISTERED, with sub-states for normal service, limited service, attempting registration update, and PLMN search. Each transition has preconditions (guard conditions), actions (send message, start timer, update context), and post-conditions. Timer T3510 guards the Registration Request: if no response arrives within T3510 expiry, the UE retransmits up to four times (configurable by the network), and if all attempts fail, enters a backoff state governed by T3511 or T3502 depending on the cause value in the rejection. The PDU Session Establishment procedure (TS 24.501 Section 6.4.1) adds SM states (PDU_SESSION_INACTIVE, PDU_SESSION_ACTIVE, PDU_SESSION_MODIFICATION_PENDING) with their own timer set (T3580, T3581, T3582). Handover state machines are particularly complex: intra-gNB handover, inter-gNB Xn handover (TS 38.423), inter-gNB N2 handover via AMF (TS 23.502 Section 4.9.1), and inter-RAT handover to LTE (TS 23.502 Section 4.11) each follow different message sequences, involve different network entities, and have different failure recovery procedures. Race conditions arise when simultaneous events hit the state machine: what happens when a Handover Command arrives while a Measurement Report is being transmitted? What if an RRC Connection Release arrives during an ongoing Security Mode procedure? Every race condition must be specified and handled. An AI tool that generates state machine code with “TODO: handle edge cases” comments instead of explicit handling for every concurrent event is generating a protocol implementation that will fail in the field — not during lab testing with a single UE, but when thousands of UEs hit rare-but-inevitable timing coincidences.
  • Real-time signaling has sub-millisecond constraints. Telecom signaling operates under latency budgets that make web application response times look glacial. In 5G NR, the HARQ (Hybrid Automatic Repeat Request) feedback timing is determined by the K1 value in DCI format 1_0/1_1, which specifies the slot offset between PDSCH reception and HARQ-ACK transmission — typically 1-8 slots, where each slot at 30 kHz subcarrier spacing is 0.5ms, meaning the UE must decode a transport block, check the CRC, and prepare ACK/NACK feedback in under 0.5ms in the tightest configuration. On the network side, the SCTP (Stream Control Transmission Protocol, RFC 4960) transport for signaling interfaces (S1-AP/NG-AP between gNB and core, Diameter between core NFs) requires multi-homing management — maintaining primary and backup paths, detecting path failures via heartbeat chunks, and performing failover without losing in-flight signaling messages. GTP-U (GPRS Tunneling Protocol User Plane, TS 29.281) tunnel management on the UPF must process millions of packets per second, each requiring TEID (Tunnel Endpoint Identifier) lookup, QoS marking based on QFI (QoS Flow Identifier), usage metering for charging, and lawful intercept duplication — all at line rate on 100Gbps interfaces. PFCP (Packet Forwarding Control Protocol, TS 29.244) sessions between SMF and UPF must be established within the time budget of a PDU Session Establishment procedure (typically under 1 second E2E including UE-gNB-AMF-SMF signaling). URLLC (Ultra-Reliable Low-Latency Communication) bearers target 1ms one-way user plane latency with 99.999% reliability, requiring pre-configured scheduling grants, mini-slot transmission, and configured grant Type 1/Type 2 mechanisms. An AI tool that generates signaling code using synchronous I/O, garbage-collected languages without deterministic latency guarantees, or standard TCP sockets where SCTP is required produces systems that fail timing requirements under load — and in telecom, a timing failure is a service failure.
  • Regulatory and standards compliance is non-negotiable. Telecommunications is one of the most heavily regulated industries on earth. In the United States, the FCC regulates spectrum usage under Title 47 CFR Parts 22 (public mobile services), 24 (PCS), 27 (miscellaneous wireless), and 30 (upper microwave for 5G mmWave), with each band having specific technical rules for transmit power, out-of-band emissions, and interference protection. E911 requirements (FCC 07-166) mandate that wireless carriers provide location information for emergency calls — horizontal accuracy within 50 meters for 80% of calls using dispatchable location or x/y coordinates, and vertical accuracy within 3 meters for 80% of calls in buildings where z-axis location is required. The transition to NG-911 (NENA i3 architecture) adds SIP-based call routing, GIS-based location determination, and ESInet/PSAP connectivity requirements. CALEA (Communications Assistance for Law Enforcement Act) requires carriers to implement lawful intercept capabilities: mediation functions that can intercept call-identifying information (pen register) and call content (wiretap) based on court-ordered warrants, delivering intercepted data in standard formats (ATIS/TIA J-STD-025B) to law enforcement agencies. Number portability through the NPAC (Number Portability Administration Center) requires SOA (Service Order Administration) interface integration for porting requests, LRN (Location Routing Number) based call routing, and dip queries to carrier databases. 3GPP conformance testing uses TTCN-3 (Testing and Test Control Notation version 3) test suites defined by ETSI, with specific test cases for every protocol procedure. Each of these regulatory requirements translates directly to code — and non-compliance means FCC fines (up to $500,000 per violation for willful violations), loss of operating licenses, and criminal liability. An AI tool that does not understand the regulatory context behind the code it generates produces technically functional but legally non-compliant systems.
  • Network telemetry and observability at scale requires domain-specific tooling. A large mobile operator manages hundreds of thousands of network elements — gNBs, core NFs, transport switches, microwave links — each generating performance counters, alarms, and configuration events. Legacy management uses SNMP (Simple Network Management Protocol) with MIBs (Management Information Bases) that define the counters and configuration parameters available on each element type — and telecom-specific MIBs (like the 3GPP-defined MIBs for eNB/gNB performance counters) run to thousands of OID entries. Modern management uses NETCONF (RFC 6241) with YANG models (RFC 7950) for configuration and state data, where a single network element may implement dozens of YANG modules with hundreds of containers, lists, and leaf nodes. gNMI (gRPC Network Management Interface) provides streaming telemetry with subscribe-once semantics — the network element pushes counter updates at configurable intervals (sample mode) or on change (on-change mode), eliminating polling overhead. The raw counters must be aggregated into KPIs (Key Performance Indicators): CSSR (Call Setup Success Rate) calculated from successful RRC Connection Setup completions divided by RRC Connection Setup attempts, CDR (Call Drop Rate) from abnormal RRC connection releases divided by total active connections, HOSR (Handover Success Rate) from successful handover completions divided by handover attempts, throughput per cell and per UE, latency percentiles, and dozens more. Alarm management follows the X.733 structured alarm model (ITU-T Recommendation) with mandatory fields: managed object class and instance, event type (communications, processing, environment, quality of service, equipment), probable cause (over 100 defined causes like “loss of signal”, “threshold crossed”, “software error”), perceived severity (critical, major, minor, warning, indeterminate, cleared), and additional text. Root cause analysis must correlate alarms across layers — a fiber cut (transport alarm) causes hundreds of cell-level alarms (radio alarms) that should be correlated to a single root cause rather than triggering hundreds of separate investigation tickets. An AI tool generating telecom observability code must understand these domain-specific data models, KPI formulas, and alarm correlation patterns — generic application monitoring tools like Prometheus with default instrumentation miss the domain-specific semantics entirely.

Telecommunications Engineering Task Support Matrix

We tested each tool against seven core telecom engineering tasks. Ratings reflect real-world performance on telecom-specific prompts, not generic coding ability.

Task Cursor Copilot Claude Code Windsurf Tabnine Amazon Q
5G NR Protocol Implementation Strong — multi-file indexing handles RRC/PDCP/RLC/MAC layers together Moderate — generates message structures but misses cross-layer dependencies Strong — reasons through 3GPP spec references, understands timer interactions and state transition sequences Moderate — basic struct definitions, limited protocol semantics Weak — no telecom protocol knowledge Basic — minimal 3GPP awareness
SIP/VoIP & IMS Integration Strong — indexes SIP stack files and SDP handling together Moderate — decent SIP message templates, weak on dialog state machines and Diameter Strong — understands SIP transaction state machines, SDP negotiation semantics, IMS call flows Moderate — basic SIP scaffolding, misses RFC nuances Weak — treats SIP as HTTP-like, no IMS awareness Basic — Amazon Chime SDK knowledge but not IMS/SIP core
NFV/CNF Orchestration Strong — excellent for Kubernetes operator patterns and multi-file Helm charts Moderate — good Kubernetes YAML generation, misses telecom-specific lifecycle requirements Strong — reasons through ETSI NFV lifecycle operations, 5GC SBA interactions Moderate — standard Kubernetes patterns, no MANO awareness Basic — generic container patterns Moderate — good EKS/ECS knowledge, AWS Wavelength awareness for edge
Protocol State Machine Design Moderate — generates state machine structure but misses timer interactions Basic — simple switch/case FSMs, no understanding of concurrent events or guard conditions Strong — reasons through state transition sequences, identifies race conditions, generates timer management Basic — generic FSM patterns without telecom semantics Weak — no protocol state machine awareness Basic — AWS Step Functions patterns, not protocol FSMs
Real-Time Signaling Systems Moderate — async patterns available but no telecom timing awareness Basic — generic async I/O, misses SCTP, GTP-U, PFCP specifics Strong — understands signaling timing constraints, SCTP multi-homing, GTP-U tunnel semantics Basic — WebSocket-level real-time, not signaling-level Weak — no signaling protocol knowledge Basic — limited to generic event-driven patterns
3GPP Conformance & Testing Moderate — generates test structures but no TTCN-3 or conformance test awareness Basic — generic unit test patterns, no protocol test methodology Strong — understands conformance test structure, can reference 3GPP test case patterns Basic — standard testing frameworks only Weak — no telecom testing knowledge Basic — AWS Device Farm for generic testing, no protocol conformance
Network Telemetry & Observability Strong — indexes YANG models and telemetry configs across project Moderate — decent SNMP/NETCONF scaffolding, weak on KPI formulas Strong — understands YANG model structure, KPI computation from raw counters, alarm correlation logic Moderate — basic monitoring patterns, no telecom KPI knowledge Basic — generic observability patterns Moderate — good CloudWatch/Timestream integration, limited YANG/gNMI

5G NR Protocol Stack Implementation

The 5G NR protocol stack is defined across multiple 3GPP specifications, and implementing it correctly requires understanding not just individual layer behavior but how layers interact under all operating conditions. The RRC layer is the most complex, managing UE state, measurement configuration, bearer setup, handover execution, and system information acquisition. Here is a production-realistic RRC state machine handler that manages the core state transitions, timer management, and measurement reporting:

# 5G NR RRC State Machine Handler
# Reference: 3GPP TS 38.331 v17.4.0

from enum import Enum, auto
from dataclasses import dataclass, field
from typing import Optional, Dict, List, Callable
import time
import struct
import hashlib

class RRCState(Enum):
    RRC_IDLE = auto()
    RRC_INACTIVE = auto()
    RRC_CONNECTED = auto()

class RRCSubState(Enum):
    # RRC_IDLE sub-states
    IDLE_CAMPED_NORMALLY = auto()
    IDLE_ANY_CELL_SELECTION = auto()
    IDLE_CAMPED_ON_ANY_CELL = auto()
    # RRC_INACTIVE sub-states
    INACTIVE_CAMPED_NORMALLY = auto()
    INACTIVE_RNA_UPDATE = auto()
    # RRC_CONNECTED sub-states
    CONNECTED_NORMAL = auto()
    CONNECTED_HANDOVER = auto()
    CONNECTED_RECONFIG = auto()

@dataclass
class RRCTimer:
    """3GPP-defined RRC timer with name, duration, and callback."""
    name: str
    duration_ms: int
    started_at: Optional[float] = None
    max_retries: int = 0
    retry_count: int = 0
    on_expiry: Optional[Callable] = None

    def start(self):
        self.started_at = time.monotonic()

    def stop(self):
        self.started_at = None
        self.retry_count = 0

    def is_running(self) -> bool:
        return self.started_at is not None

    def is_expired(self) -> bool:
        if self.started_at is None:
            return False
        elapsed_ms = (time.monotonic() - self.started_at) * 1000
        return elapsed_ms >= self.duration_ms

@dataclass
class MeasurementConfig:
    """Measurement configuration from RRCReconfiguration."""
    meas_id: int
    meas_object_id: int  # frequency/carrier to measure
    report_config_id: int
    # Event-triggered reporting (A1-A6, B1-B2)
    event_type: str  # "A1", "A2", "A3", "A5", "B1", etc.
    threshold_rsrp: Optional[int] = None  # dBm
    offset_db: float = 0.0
    hysteresis_db: float = 0.0
    time_to_trigger_ms: int = 0
    report_interval_ms: int = 0
    max_report_cells: int = 8

@dataclass
class NRRRCController:
    """5G NR RRC layer state machine controller.

    Implements TS 38.331 RRC state machine with full timer
    management and measurement reporting.
    """
    state: RRCState = RRCState.RRC_IDLE
    sub_state: RRCSubState = RRCSubState.IDLE_CAMPED_NORMALLY

    # UE identity context
    c_rnti: Optional[int] = None
    i_rnti: Optional[int] = None  # for RRC_INACTIVE
    resume_mac_i: Optional[bytes] = None

    # Security context
    kgnb: Optional[bytes] = None
    nas_security_activated: bool = False
    as_security_activated: bool = False

    # Timer bank (TS 38.331 Section 7.1.1)
    timers: Dict[str, RRCTimer] = field(default_factory=dict)

    # Measurement configuration
    meas_configs: Dict[int, MeasurementConfig] = field(default_factory=dict)

    # Pending procedures
    pending_srb_setup: List[int] = field(default_factory=list)
    pending_drb_setup: List[int] = field(default_factory=list)

    def __post_init__(self):
        self._init_timers()

    def _init_timers(self):
        """Initialize 3GPP-defined RRC timers."""
        # TS 38.331 Section 7.1.1 timer definitions
        self.timers = {
            "T300": RRCTimer("T300", 1000, max_retries=4,
                             on_expiry=self._on_t300_expiry),
            "T301": RRCTimer("T301", 1000, max_retries=0,
                             on_expiry=self._on_t301_expiry),
            "T302": RRCTimer("T302", 0),  # set by SIB1 barring
            "T304": RRCTimer("T304", 100,
                             on_expiry=self._on_t304_expiry),
            "T310": RRCTimer("T310", 1000,
                             on_expiry=self._on_t310_expiry),
            "T311": RRCTimer("T311", 3000,
                             on_expiry=self._on_t311_expiry),
            "T319": RRCTimer("T319", 1000, max_retries=4,
                             on_expiry=self._on_t319_expiry),
            "T320": RRCTimer("T320", 0),  # set by RRCRelease
            "T325": RRCTimer("T325", 0),  # logging duration
            "T330": RRCTimer("T330", 0),  # DC config
            "T331": RRCTimer("T331", 0),  # sidelink
        }

    def handle_rrc_setup_request(self, establishment_cause: int,
                                  ue_identity: bytes):
        """Initiate RRC Connection Setup (TS 38.331 5.3.3).

        Args:
            establishment_cause: as per TS 38.331 (0=emergency,
                1=highPriorityAccess, 2=mt-Access, 3=mo-Signalling,
                4=mo-Data, 5=mo-VoiceCall, 6=mo-VideoCall, 7=mo-SMS)
            ue_identity: 39-bit random value or I-RNTI for resume
        """
        if self.state != RRCState.RRC_IDLE:
            raise InvalidStateError(
                f"RRCSetupRequest requires RRC_IDLE, "
                f"current state: {self.state}")

        # Build RRCSetupRequest message (ASN.1 PER encoded)
        msg = self._encode_rrc_setup_request(
            establishment_cause, ue_identity)

        # Start T300 (guards the setup procedure)
        self.timers["T300"].start()

        # Send on SRB0 (CCCH)
        self._send_on_srb(0, msg)

    def handle_rrc_setup(self, msg: bytes):
        """Process RRCSetup from gNB (TS 38.331 5.3.3.4).

        Configures SRB1, applies radio bearer config, and
        transitions toward RRC_CONNECTED.
        """
        if not self.timers["T300"].is_running():
            # Unexpected RRCSetup outside setup procedure
            return

        # Stop T300
        self.timers["T300"].stop()

        # Decode and apply radioBearerConfig
        config = self._decode_rrc_setup(msg)

        # Establish SRB1
        self._setup_srb(1, config.get("srb1_config"))

        # Apply MAC/PHY configuration (cell group config)
        if "cellGroupConfig" in config:
            self._apply_cell_group_config(
                config["cellGroupConfig"])

        # Assign C-RNTI
        self.c_rnti = config.get("c_rnti")

        # Transition to RRC_CONNECTED
        self._transition_to(RRCState.RRC_CONNECTED,
                           RRCSubState.CONNECTED_NORMAL)

        # Send RRCSetupComplete on SRB1 (carries NAS attach)
        complete_msg = self._encode_rrc_setup_complete(
            config.get("selected_plmn_identity", 0))
        self._send_on_srb(1, complete_msg)

    def handle_rrc_reconfiguration(self, msg: bytes):
        """Process RRCReconfiguration (TS 38.331 5.3.5).

        Handles bearer setup/modification, measurement config,
        handover commands, and SCell addition/release.
        """
        if self.state != RRCState.RRC_CONNECTED:
            raise InvalidStateError(
                "RRCReconfiguration requires RRC_CONNECTED")

        config = self._decode_rrc_reconfiguration(msg)

        # Check if this is a handover command
        if "mobilityControlInfo" in config:
            self._handle_handover(config)
            return

        self.sub_state = RRCSubState.CONNECTED_RECONFIG

        # Apply measurement configuration
        if "measConfig" in config:
            self._apply_meas_config(config["measConfig"])

        # Setup/modify DRBs
        if "drb_ToAddModList" in config:
            for drb in config["drb_ToAddModList"]:
                self._setup_drb(drb["drb_Identity"],
                               drb.get("pdcp_Config"),
                               drb.get("sdap_Config"))

        # Release DRBs
        if "drb_ToReleaseList" in config:
            for drb_id in config["drb_ToReleaseList"]:
                self._release_drb(drb_id)

        # Apply SCell configuration
        if "sCellToAddModList" in config:
            for scell in config["sCellToAddModList"]:
                self._configure_scell(scell)

        self.sub_state = RRCSubState.CONNECTED_NORMAL

        # Send RRCReconfigurationComplete
        self._send_on_srb(1,
            self._encode_rrc_reconfiguration_complete())

    def _handle_handover(self, config: dict):
        """Execute intra-NR handover (TS 38.331 5.3.5.4)."""
        self.sub_state = RRCSubState.CONNECTED_HANDOVER

        mobility = config["mobilityControlInfo"]
        target_pci = mobility["targetPhysCellId"]
        new_c_rnti = mobility.get("newUE_Identity")

        # Derive new KgNB from current KgNB and NCC
        ncc = mobility.get("nextHopChainingCount", 0)
        self.kgnb = self._derive_kgnb_star(
            self.kgnb, target_pci, ncc)

        # Start T304 (handover execution timer)
        t304_ms = mobility.get("t304", 100)
        self.timers["T304"].duration_ms = t304_ms
        self.timers["T304"].start()

        # Perform random access on target cell
        rach_config = mobility.get("rach_ConfigDedicated")
        self._initiate_contention_free_ra(
            target_pci, rach_config)

        # Apply target cell configuration
        if "cellGroupConfig" in config:
            self._apply_cell_group_config(
                config["cellGroupConfig"])

        self.c_rnti = new_c_rnti

    def handle_handover_complete(self):
        """Handover completed successfully."""
        if self.sub_state != RRCSubState.CONNECTED_HANDOVER:
            return

        self.timers["T304"].stop()
        self.sub_state = RRCSubState.CONNECTED_NORMAL

        # Send RRCReconfigurationComplete to target gNB
        self._send_on_srb(1,
            self._encode_rrc_reconfiguration_complete())

    def _on_t300_expiry(self):
        """T300 expired: RRC setup attempt failed."""
        timer = self.timers["T300"]
        if timer.retry_count < timer.max_retries:
            timer.retry_count += 1
            timer.start()  # Retry
        else:
            # All attempts exhausted, inform upper layers
            self._notify_nas("RRC_SETUP_FAILURE",
                            cause="T300_EXPIRED")
            timer.stop()

    def _on_t304_expiry(self):
        """T304 expired: Handover failure (TS 38.331 5.3.5.8).

        UE must initiate RRC re-establishment or fallback
        to source cell if possible.
        """
        self.timers["T304"].stop()
        self.sub_state = RRCSubState.CONNECTED_NORMAL

        # Attempt RRC re-establishment
        self._initiate_rrc_reestablishment(
            cause="handoverFailure")

    def _on_t310_expiry(self):
        """T310 expired: Radio link failure detected.

        Physical layer has reported consecutive out-of-sync
        indications. Start T311 for cell selection.
        TS 38.331 Section 5.3.10.3.
        """
        self.timers["T310"].stop()

        # Start T311 (cell selection after RLF)
        self.timers["T311"].start()

        # Suspend all SRBs except SRB0
        self._suspend_srbs()

        # Initiate cell selection procedure
        self._start_cell_selection_after_rlf()

    def _on_t311_expiry(self):
        """T311 expired: No suitable cell found after RLF.

        Transition to RRC_IDLE. Connection is lost.
        """
        self.timers["T311"].stop()
        self._transition_to(RRCState.RRC_IDLE,
                           RRCSubState.IDLE_ANY_CELL_SELECTION)
        self._notify_nas("RRC_CONNECTION_FAILURE",
                        cause="RADIO_LINK_FAILURE")

    def _on_t301_expiry(self):
        """T301 expired: RRC re-establishment failed."""
        self.timers["T301"].stop()
        self._transition_to(RRCState.RRC_IDLE,
                           RRCSubState.IDLE_CAMPED_NORMALLY)
        self._notify_nas("RRC_REESTABLISHMENT_FAILURE",
                        cause="T301_EXPIRED")

    def _on_t319_expiry(self):
        """T319 expired: RRC resume failed."""
        timer = self.timers["T319"]
        if timer.retry_count < timer.max_retries:
            timer.retry_count += 1
            timer.start()
        else:
            timer.stop()
            self._transition_to(RRCState.RRC_IDLE,
                               RRCSubState.IDLE_CAMPED_NORMALLY)
            self._notify_nas("RRC_RESUME_FAILURE",
                            cause="T319_EXPIRED")

    def _apply_meas_config(self, meas_config: dict):
        """Apply measurement configuration (TS 38.331 5.5.2)."""
        # Add/modify measurement objects
        if "measObjectToAddModList" in meas_config:
            for obj in meas_config["measObjectToAddModList"]:
                self._add_meas_object(obj)

        # Add/modify report configurations
        if "reportConfigToAddModList" in meas_config:
            for rpt in meas_config["reportConfigToAddModList"]:
                self._add_report_config(rpt)

        # Add/modify measurement IDs (link object to report)
        if "measIdToAddModList" in meas_config:
            for mid in meas_config["measIdToAddModList"]:
                self.meas_configs[mid["measId"]] = \
                    MeasurementConfig(
                        meas_id=mid["measId"],
                        meas_object_id=mid["measObjectId"],
                        report_config_id=mid["reportConfigId"],
                        event_type=mid.get("event", "A3"))

        # Quantity config (filtering coefficients)
        if "quantityConfig" in meas_config:
            self._apply_quantity_config(
                meas_config["quantityConfig"])

    def evaluate_measurement_events(self, serving_rsrp: float,
                                     neighbor_measurements: dict):
        """Evaluate measurement event triggers (TS 38.331 5.5.4).

        Args:
            serving_rsrp: Serving cell RSRP in dBm
            neighbor_measurements: {pci: rsrp_dbm} for neighbors
        """
        for meas_id, config in self.meas_configs.items():
            if config.event_type == "A3":
                # A3: Neighbour becomes offset better than SpCell
                for pci, rsrp in neighbor_measurements.items():
                    mn = rsrp  # Mn: measurement of neighbour
                    ms = serving_rsrp  # Ms: measurement of SpCell
                    ofn = config.offset_db  # frequency offset
                    hys = config.hysteresis_db
                    # Entering condition: Mn + Ofn - Hys > Ms + Off
                    if mn + ofn - hys > ms + config.offset_db:
                        self._trigger_measurement_report(
                            meas_id, pci, rsrp)

            elif config.event_type == "A2":
                # A2: Serving becomes worse than threshold
                thresh = config.threshold_rsrp
                hys = config.hysteresis_db
                if serving_rsrp + hys < thresh:
                    self._trigger_measurement_report(
                        meas_id, None, serving_rsrp)

    def _transition_to(self, new_state: RRCState,
                       new_sub_state: RRCSubState):
        """Execute state transition with cleanup."""
        old_state = self.state

        # State exit actions
        if old_state == RRCState.RRC_CONNECTED and \
           new_state != RRCState.RRC_CONNECTED:
            self._release_all_drbs()
            self.meas_configs.clear()
            self.as_security_activated = False

        if old_state == RRCState.RRC_CONNECTED and \
           new_state == RRCState.RRC_INACTIVE:
            # Preserve context for resume
            pass  # i-RNTI and resume MAC-I already stored

        if new_state == RRCState.RRC_IDLE:
            self.c_rnti = None
            self.i_rnti = None
            self.kgnb = None
            self._stop_all_timers()

        self.state = new_state
        self.sub_state = new_sub_state

    # ... (placeholder methods for encoding/decoding/transport)
    def _encode_rrc_setup_request(self, cause, identity):
        """ASN.1 PER encode RRCSetupRequest."""
        pass
    def _decode_rrc_setup(self, msg): return {}
    def _decode_rrc_reconfiguration(self, msg): return {}
    def _encode_rrc_setup_complete(self, plmn_id): return b""
    def _encode_rrc_reconfiguration_complete(self): return b""
    def _send_on_srb(self, srb_id, msg): pass
    def _setup_srb(self, srb_id, config): pass
    def _setup_drb(self, drb_id, pdcp, sdap): pass
    def _release_drb(self, drb_id): pass
    def _release_all_drbs(self): pass
    def _configure_scell(self, config): pass
    def _apply_cell_group_config(self, config): pass
    def _notify_nas(self, event, **kwargs): pass
    def _suspend_srbs(self): pass
    def _start_cell_selection_after_rlf(self): pass
    def _initiate_rrc_reestablishment(self, cause): pass
    def _initiate_contention_free_ra(self, pci, config): pass
    def _derive_kgnb_star(self, kgnb, pci, ncc): return b""
    def _add_meas_object(self, obj): pass
    def _add_report_config(self, rpt): pass
    def _apply_quantity_config(self, config): pass
    def _trigger_measurement_report(self, mid, pci, rsrp): pass
    def _stop_all_timers(self):
        for t in self.timers.values():
            t.stop()


class InvalidStateError(Exception):
    pass

Key things AI tools get wrong here: generating RRC state machines without T310/T311 cascading (radio link failure requires T310 expiry to trigger T311, and T311 expiry to trigger transition to IDLE — not a direct jump), missing the handover T304 timer that guards the handover execution window, and treating measurement events as simple threshold comparisons without hysteresis and time-to-trigger filtering. Claude Code handles the 3GPP timer semantics well because it can reason through the spec’s procedural descriptions. Cursor excels when the protocol stack files are already in the project context, indexing across RRC/PDCP/RLC/MAC layers. Copilot generates reasonable message structure boilerplate but misses the timer interaction logic that makes a protocol implementation correct under failure conditions.

SIP/VoIP Message Processing

SIP is a text-based protocol that looks deceptively simple but contains enormous complexity in dialog management, transaction state machines, and SDP offer/answer negotiation. A production SIP stack must handle request routing, Via header manipulation, Record-Route processing for in-dialog requests, and proper transaction retransmission for unreliable transports. Here is a SIP dialog state machine with SDP handling:

# SIP Dialog State Machine with SDP Negotiation
# Reference: RFC 3261, RFC 3264 (Offer/Answer)

from enum import Enum, auto
from dataclasses import dataclass, field
from typing import Optional, Dict, List, Tuple
import hashlib
import time
import re

class DialogState(Enum):
    NONE = auto()
    EARLY = auto()       # After 1xx with To-tag
    CONFIRMED = auto()   # After 2xx
    TERMINATED = auto()  # After BYE or error

class TransactionState(Enum):
    # INVITE client transaction (RFC 3261 Section 17.1.1)
    ICT_CALLING = auto()
    ICT_PROCEEDING = auto()
    ICT_COMPLETED = auto()
    ICT_TERMINATED = auto()
    # INVITE server transaction (RFC 3261 Section 17.2.1)
    IST_PROCEEDING = auto()
    IST_COMPLETED = auto()
    IST_CONFIRMED = auto()
    IST_TERMINATED = auto()
    # Non-INVITE client transaction
    NICT_TRYING = auto()
    NICT_PROCEEDING = auto()
    NICT_COMPLETED = auto()
    NICT_TERMINATED = auto()

@dataclass
class SDPSession:
    """SDP session description (RFC 4566)."""
    version: int = 0
    origin_username: str = "-"
    origin_session_id: str = ""
    origin_session_version: int = 0
    connection_address: str = ""
    media_descriptions: List[dict] = field(default_factory=list)

    @classmethod
    def parse(cls, sdp_text: str) -> "SDPSession":
        """Parse SDP from text."""
        session = cls()
        current_media = None

        for line in sdp_text.strip().split("\r\n"):
            if len(line) < 2 or line[1] != "=":
                continue
            field_type = line[0]
            value = line[2:]

            if field_type == "v":
                session.version = int(value)
            elif field_type == "o":
                parts = value.split()
                if len(parts) >= 6:
                    session.origin_username = parts[0]
                    session.origin_session_id = parts[1]
                    session.origin_session_version = int(parts[2])
            elif field_type == "c":
                # c=IN IP4 192.168.1.100
                parts = value.split()
                if len(parts) >= 3:
                    addr = parts[2]
                    if current_media:
                        current_media["connection"] = addr
                    else:
                        session.connection_address = addr
            elif field_type == "m":
                # m=audio 49170 RTP/AVP 0 8 97
                parts = value.split()
                current_media = {
                    "type": parts[0],
                    "port": int(parts[1]),
                    "protocol": parts[2],
                    "formats": parts[3:],
                    "attributes": {},
                    "rtpmap": {},
                    "fmtp": {},
                    "connection": None,
                }
                session.media_descriptions.append(current_media)
            elif field_type == "a" and current_media:
                if ":" in value:
                    attr_name, attr_value = value.split(":", 1)
                    if attr_name == "rtpmap":
                        # a=rtpmap:97 AMR-WB/16000/1
                        pt_rest = attr_value.split(" ", 1)
                        if len(pt_rest) == 2:
                            pt = pt_rest[0]
                            current_media["rtpmap"][pt] = \
                                pt_rest[1]
                    elif attr_name == "fmtp":
                        pt_rest = attr_value.split(" ", 1)
                        if len(pt_rest) == 2:
                            current_media["fmtp"][pt_rest[0]] = \
                                pt_rest[1]
                    else:
                        current_media["attributes"][attr_name] = \
                            attr_value
                else:
                    # Direction attributes: sendrecv, recvonly, etc.
                    current_media["attributes"][value] = ""

        return session

    def negotiate_answer(self, offer: "SDPSession") -> \
            "SDPSession":
        """Generate SDP answer from received offer (RFC 3264).

        For each media line in the offer, include a corresponding
        line in the answer with intersected codecs. If no codecs
        match, set port to 0 (reject media line).
        """
        answer = SDPSession()
        answer.origin_session_id = self.origin_session_id
        answer.origin_session_version = \
            self.origin_session_version + 1
        answer.connection_address = self.connection_address

        for offered_media in offer.media_descriptions:
            answer_media = {
                "type": offered_media["type"],
                "protocol": offered_media["protocol"],
                "attributes": {},
                "rtpmap": {},
                "fmtp": {},
                "connection": self.connection_address,
            }

            # Intersect supported codecs
            our_codecs = self._get_supported_codecs(
                offered_media["type"])
            matched_formats = []

            for fmt in offered_media["formats"]:
                codec_name = offered_media["rtpmap"].get(
                    fmt, fmt)
                if self._codec_supported(
                        offered_media["type"], codec_name):
                    matched_formats.append(fmt)
                    if fmt in offered_media["rtpmap"]:
                        answer_media["rtpmap"][fmt] = \
                            offered_media["rtpmap"][fmt]
                    if fmt in offered_media["fmtp"]:
                        answer_media["fmtp"][fmt] = \
                            offered_media["fmtp"][fmt]

            if matched_formats:
                answer_media["port"] = \
                    self._allocate_rtp_port()
                answer_media["formats"] = matched_formats
                answer_media["attributes"]["sendrecv"] = ""
            else:
                # Reject this media line (port 0)
                answer_media["port"] = 0
                answer_media["formats"] = \
                    offered_media["formats"][:1]

            answer.media_descriptions.append(answer_media)

        return answer

    def _get_supported_codecs(self, media_type):
        return {}
    def _codec_supported(self, media_type, codec_name):
        return True
    def _allocate_rtp_port(self):
        return 0

@dataclass
class SIPDialog:
    """SIP dialog state manager (RFC 3261 Section 12)."""
    call_id: str = ""
    local_tag: str = ""
    remote_tag: str = ""
    local_uri: str = ""
    remote_uri: str = ""
    remote_target: str = ""  # Contact URI
    route_set: List[str] = field(default_factory=list)
    local_cseq: int = 0
    remote_cseq: int = 0
    state: DialogState = DialogState.NONE

    # SDP state
    local_sdp: Optional[SDPSession] = None
    remote_sdp: Optional[SDPSession] = None
    sdp_offer_pending: bool = False

    def create_request(self, method: str) -> dict:
        """Create in-dialog request (RFC 3261 Section 12.2.1.1).

        Request-URI is remote_target (Contact from peer).
        Route headers from route_set.
        """
        self.local_cseq += 1

        request = {
            "method": method,
            "request_uri": self.remote_target,
            "headers": {
                "Via": self._generate_via(),
                "From": f"<{self.local_uri}>"
                        f";tag={self.local_tag}",
                "To": f"<{self.remote_uri}>"
                      f";tag={self.remote_tag}",
                "Call-ID": self.call_id,
                "CSeq": f"{self.local_cseq} {method}",
                "Max-Forwards": "70",
            }
        }

        # Add Route headers from route set
        if self.route_set:
            # If first route is loose-routing (lr param),
            # use route set as-is. Otherwise, first route
            # becomes Request-URI (strict routing, RFC 3261
            # Section 12.2.1.1).
            first_route = self.route_set[0]
            if ";lr" in first_route.lower():
                request["headers"]["Route"] = \
                    ", ".join(self.route_set)
            else:
                request["request_uri"] = first_route
                remaining = self.route_set[1:]
                remaining.append(f"<{self.remote_target}>")
                request["headers"]["Route"] = \
                    ", ".join(remaining)

        return request

    def process_response(self, response: dict):
        """Update dialog state from response."""
        status_code = response.get("status_code", 0)

        if 100 <= status_code <= 199:
            to_tag = self._extract_tag(
                response["headers"].get("To", ""))
            if to_tag and self.state == DialogState.NONE:
                self.remote_tag = to_tag
                self.state = DialogState.EARLY
                self._update_route_set(response)
                self._update_remote_target(response)

        elif 200 <= status_code <= 299:
            to_tag = self._extract_tag(
                response["headers"].get("To", ""))
            if to_tag:
                self.remote_tag = to_tag
            self.state = DialogState.CONFIRMED
            self._update_route_set(response)
            self._update_remote_target(response)

            # Process SDP answer if present
            if "body" in response and \
               "application/sdp" in response.get(
                   "headers", {}).get("Content-Type", ""):
                self.remote_sdp = SDPSession.parse(
                    response["body"])
                self.sdp_offer_pending = False

        elif 300 <= status_code <= 699:
            if self.state == DialogState.EARLY:
                self.state = DialogState.TERMINATED

    def send_bye(self):
        """Terminate dialog with BYE."""
        if self.state not in (DialogState.CONFIRMED,
                              DialogState.EARLY):
            return None

        request = self.create_request("BYE")
        self.state = DialogState.TERMINATED
        return request

    def handle_reinvite(self, request: dict):
        """Process re-INVITE for hold/resume/codec change."""
        if self.state != DialogState.CONFIRMED:
            return {"status_code": 481,
                    "reason": "Call/Transaction Does Not Exist"}

        # Validate CSeq
        cseq_num = int(
            request["headers"]["CSeq"].split()[0])
        if cseq_num <= self.remote_cseq:
            return {"status_code": 500,
                    "reason": "CSeq out of order"}
        self.remote_cseq = cseq_num

        # Process SDP offer
        if "body" in request:
            offer = SDPSession.parse(request["body"])
            self.remote_sdp = offer

            # Generate SDP answer
            if self.local_sdp:
                answer = self.local_sdp.negotiate_answer(offer)
                return {
                    "status_code": 200,
                    "reason": "OK",
                    "body": answer,
                    "headers": {
                        "Content-Type": "application/sdp"
                    }
                }

        return {"status_code": 200, "reason": "OK"}

    def _extract_tag(self, header_value: str) -> Optional[str]:
        match = re.search(r";tag=([^\s;,]+)", header_value)
        return match.group(1) if match else None

    def _generate_via(self):
        branch = f"z9hG4bK{hashlib.md5(str(time.monotonic()).encode()).hexdigest()[:16]}"
        return f"SIP/2.0/UDP 0.0.0.0:5060;branch={branch}"

    def _update_route_set(self, response):
        rr = response.get("headers", {}).get("Record-Route")
        if rr:
            # Reverse for UAS (RFC 3261 Section 12.1.2)
            routes = [r.strip() for r in rr.split(",")]
            self.route_set = list(reversed(routes))

    def _update_remote_target(self, response):
        contact = response.get("headers", {}).get("Contact")
        if contact:
            match = re.search(r"<([^>]+)>", contact)
            if match:
                self.remote_target = match.group(1)

AI tools consistently make three mistakes with SIP: treating it as a stateless request-response protocol (like HTTP), ignoring the Record-Route/Route header chain required for in-dialog request routing, and generating SDP answers that simply echo the offer instead of intersecting codec capabilities. Claude Code understands SIP dialog semantics because it can reason through the RFC’s procedural rules. Cursor performs well when the SIP stack code is already in the project — it indexes across dialog, transaction, and transport layers. Copilot generates syntactically correct SIP message construction but misses the branch parameter magic cookie requirement (z9hG4bK prefix, RFC 3261 Section 8.1.1.7) and the strict vs. loose routing distinction.

Network Function Lifecycle Management

Cloud-native 5G network functions require lifecycle management that goes beyond standard Kubernetes deployment patterns. A telecom CNF must support graceful session draining during scale-in (you cannot kill a pod that has active VoNR calls), health monitoring that understands signaling protocol state (a pod can be “Running” in Kubernetes terms but have lost its NRF registration), and auto-scaling based on telecom-specific KPIs rather than generic CPU/memory metrics:

# Cloud-Native Network Function Lifecycle Manager
# 5G Core CNF with graceful draining and telecom KPI scaling

import asyncio
import aiohttp
import json
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set
from enum import Enum, auto

class NFState(Enum):
    INSTANTIATING = auto()
    REGISTERED = auto()     # Registered with NRF
    ACTIVE = auto()          # Serving traffic
    DRAINING = auto()        # Graceful shutdown
    DEREGISTERING = auto()   # Removing from NRF
    TERMINATED = auto()

@dataclass
class ActiveSession:
    """Tracks an active signaling session on this NF instance."""
    session_id: str
    session_type: str       # "pdu_session", "registration", etc.
    created_at: float
    last_activity: float
    ue_supi: str
    priority: int = 0       # Emergency sessions get higher priority

@dataclass
class NFInstance:
    """Represents a single CNF instance (pod)."""
    instance_id: str
    nf_type: str            # "AMF", "SMF", "UPF", "PCF", etc.
    state: NFState = NFState.INSTANTIATING
    api_endpoint: str = ""
    nrf_registration_id: Optional[str] = None
    active_sessions: Dict[str, ActiveSession] = field(
        default_factory=dict)
    health_check_failures: int = 0
    last_heartbeat: float = 0.0

class CNFLifecycleManager:
    """Manages lifecycle of 5G Core cloud-native network functions.

    Implements ETSI NFV SOL005/SOL006 lifecycle operations adapted
    for Kubernetes-native deployment with 3GPP NRF integration.
    """

    def __init__(self, nf_type: str, nrf_url: str,
                 namespace: str = "5gc"):
        self.nf_type = nf_type
        self.nrf_url = nrf_url
        self.namespace = namespace
        self.instances: Dict[str, NFInstance] = {}
        self.drain_timeout_s = 300  # 5 min max drain
        self.heartbeat_interval_s = 10
        self.max_health_failures = 3

        # Telecom-specific scaling thresholds
        self.scale_config = {
            "AMF": {
                "metric": "active_registrations",
                "scale_up_threshold": 50000,
                "scale_down_threshold": 20000,
                "min_replicas": 2,
                "max_replicas": 20,
                "cooldown_s": 120,
            },
            "SMF": {
                "metric": "active_pdu_sessions",
                "scale_up_threshold": 100000,
                "scale_down_threshold": 40000,
                "min_replicas": 2,
                "max_replicas": 30,
                "cooldown_s": 180,
            },
            "UPF": {
                "metric": "throughput_gbps",
                "scale_up_threshold": 80,  # 80 Gbps per UPF
                "scale_down_threshold": 30,
                "min_replicas": 2,
                "max_replicas": 50,
                "cooldown_s": 60,
            },
        }

    async def instantiate(self, instance_id: str,
                          config: dict) -> NFInstance:
        """Instantiate a new NF instance.

        Steps:
        1. Create instance record
        2. Deploy pod via Kubernetes API
        3. Wait for readiness
        4. Register with NRF (TS 29.510)
        5. Transition to ACTIVE
        """
        instance = NFInstance(
            instance_id=instance_id,
            nf_type=self.nf_type,
            api_endpoint=config.get("api_endpoint", ""),
        )
        self.instances[instance_id] = instance

        # Deploy pod (simplified K8s API call)
        await self._deploy_pod(instance, config)

        # Wait for pod readiness with timeout
        ready = await self._wait_for_ready(
            instance, timeout_s=120)
        if not ready:
            instance.state = NFState.TERMINATED
            raise NFLifecycleError(
                f"Instance {instance_id} failed readiness check")

        # Register with NRF (3GPP TS 29.510)
        nrf_reg = await self._register_with_nrf(instance)
        instance.nrf_registration_id = nrf_reg
        instance.state = NFState.REGISTERED

        # Start heartbeat loop
        asyncio.create_task(
            self._heartbeat_loop(instance))

        instance.state = NFState.ACTIVE
        return instance

    async def graceful_drain(self, instance_id: str):
        """Gracefully drain an NF instance before termination.

        Critical for telecom: cannot kill pods with active
        signaling sessions. Must:
        1. Deregister from NRF (stop receiving new sessions)
        2. Wait for active sessions to complete or timeout
        3. Force-migrate remaining sessions if drain times out
        4. Terminate pod
        """
        instance = self.instances.get(instance_id)
        if not instance or instance.state == NFState.TERMINATED:
            return

        instance.state = NFState.DRAINING

        # Step 1: Deregister from NRF to stop new traffic
        await self._deregister_from_nrf(instance)
        instance.state = NFState.DEREGISTERING

        # Step 2: Wait for sessions to drain naturally
        drain_start = time.monotonic()
        while instance.active_sessions:
            elapsed = time.monotonic() - drain_start
            if elapsed > self.drain_timeout_s:
                break

            # Check for completed sessions
            completed = []
            for sid, session in instance.active_sessions.items():
                idle_time = time.monotonic() - session.last_activity
                if idle_time > 60:  # 60s idle = likely done
                    completed.append(sid)

            for sid in completed:
                del instance.active_sessions[sid]

            remaining = len(instance.active_sessions)
            if remaining > 0:
                # Log progress
                await self._log_drain_progress(
                    instance_id, remaining,
                    self.drain_timeout_s - elapsed)

            await asyncio.sleep(5)

        # Step 3: Force-migrate remaining sessions
        if instance.active_sessions:
            emergency_count = sum(
                1 for s in instance.active_sessions.values()
                if s.priority > 0)

            if emergency_count > 0:
                # Emergency sessions: migrate, do not drop
                await self._migrate_sessions(
                    instance,
                    [s for s in instance.active_sessions.values()
                     if s.priority > 0])

            # Non-emergency: force release with cause
            for session in list(
                    instance.active_sessions.values()):
                if session.priority == 0:
                    await self._force_release_session(
                        instance, session,
                        cause="NF_INSTANCE_TERMINATING")

        # Step 4: Terminate
        await self._terminate_pod(instance)
        instance.state = NFState.TERMINATED

    async def evaluate_scaling(self):
        """Evaluate telecom KPI-based scaling decisions.

        Unlike generic HPA which scales on CPU/memory, telecom
        NFs scale on domain-specific metrics: active registrations
        for AMF, active PDU sessions for SMF, throughput for UPF.
        """
        config = self.scale_config.get(self.nf_type)
        if not config:
            return

        active_instances = [
            i for i in self.instances.values()
            if i.state == NFState.ACTIVE]
        current_count = len(active_instances)

        # Collect aggregate metric
        total_metric = 0
        for instance in active_instances:
            metric_val = await self._get_instance_metric(
                instance, config["metric"])
            total_metric += metric_val

        per_instance = (total_metric / current_count
                       if current_count > 0 else 0)

        # Scale up: average per instance exceeds threshold
        if per_instance > config["scale_up_threshold"]:
            if current_count < config["max_replicas"]:
                new_id = f"{self.nf_type.lower()}-" \
                         f"{int(time.time())}"
                await self.instantiate(new_id, {
                    "api_endpoint": self._generate_endpoint(
                        new_id)
                })
                return {"action": "scale_up",
                        "new_count": current_count + 1,
                        "reason": f"{config['metric']}="
                                  f"{per_instance:.0f} > "
                                  f"{config['scale_up_threshold']}"}

        # Scale down: average below threshold, drain oldest
        elif per_instance < config["scale_down_threshold"]:
            if current_count > config["min_replicas"]:
                # Select instance with fewest active sessions
                target = min(active_instances,
                            key=lambda i: len(i.active_sessions))
                await self.graceful_drain(target.instance_id)
                return {"action": "scale_down",
                        "new_count": current_count - 1,
                        "reason": f"{config['metric']}="
                                  f"{per_instance:.0f} < "
                                  f"{config['scale_down_threshold']}"}

        return {"action": "none", "current_count": current_count}

    async def _register_with_nrf(self, instance: NFInstance) \
            -> str:
        """Register NF instance with NRF (TS 29.510 Section 5.2.2.2).

        PUT /nnrf-nfm/v1/nf-instances/{nfInstanceId}
        """
        nf_profile = {
            "nfInstanceId": instance.instance_id,
            "nfType": self.nf_type,
            "nfStatus": "REGISTERED",
            "ipv4Addresses": [
                instance.api_endpoint.split(":")[0]],
            "nfServices": [{
                "serviceInstanceId": f"{instance.instance_id}"
                                     f"-svc-1",
                "serviceName": self._get_service_name(),
                "versions": [{"apiVersionInUri": "v1",
                              "apiFullVersion": "1.0.0"}],
                "scheme": "https",
                "nfServiceStatus": "REGISTERED",
            }],
            "heartBeatTimer": self.heartbeat_interval_s,
        }

        async with aiohttp.ClientSession() as session:
            url = (f"{self.nrf_url}/nnrf-nfm/v1/nf-instances/"
                   f"{instance.instance_id}")
            async with session.put(url,
                                   json=nf_profile) as resp:
                if resp.status in (200, 201):
                    return instance.instance_id
                raise NFLifecycleError(
                    f"NRF registration failed: {resp.status}")

    async def _deregister_from_nrf(self, instance: NFInstance):
        """Deregister from NRF (TS 29.510 Section 5.2.2.4)."""
        async with aiohttp.ClientSession() as session:
            url = (f"{self.nrf_url}/nnrf-nfm/v1/nf-instances/"
                   f"{instance.instance_id}")
            async with session.delete(url) as resp:
                if resp.status not in (200, 204, 404):
                    raise NFLifecycleError(
                        f"NRF deregistration failed: "
                        f"{resp.status}")

    async def _heartbeat_loop(self, instance: NFInstance):
        """NRF heartbeat (TS 29.510 Section 5.2.2.3)."""
        while instance.state in (NFState.REGISTERED,
                                  NFState.ACTIVE):
            try:
                async with aiohttp.ClientSession() as session:
                    url = (f"{self.nrf_url}/nnrf-nfm/v1/"
                           f"nf-instances/"
                           f"{instance.instance_id}")
                    patch = [{"op": "replace",
                              "path": "/nfStatus",
                              "value": "REGISTERED"}]
                    async with session.patch(
                            url, json=patch) as resp:
                        if resp.status == 200:
                            instance.health_check_failures = 0
                            instance.last_heartbeat = \
                                time.monotonic()
                        else:
                            instance.health_check_failures += 1
            except Exception:
                instance.health_check_failures += 1

            if instance.health_check_failures >= \
                    self.max_health_failures:
                # Self-heal: re-register
                await self._register_with_nrf(instance)
                instance.health_check_failures = 0

            await asyncio.sleep(self.heartbeat_interval_s)

    def _get_service_name(self) -> str:
        service_map = {
            "AMF": "namf-comm",
            "SMF": "nsmf-pdusession",
            "UPF": "nupf-ee",
            "PCF": "npcf-smpolicycontrol",
            "UDM": "nudm-sdm",
            "AUSF": "nausf-auth",
            "NRF": "nnrf-nfm",
            "NSSF": "nnssf-nsselection",
        }
        return service_map.get(self.nf_type, f"n{self.nf_type.lower()}")

    # Stubs for infrastructure operations
    async def _deploy_pod(self, inst, config): pass
    async def _wait_for_ready(self, inst, timeout_s): return True
    async def _terminate_pod(self, inst): pass
    async def _get_instance_metric(self, inst, metric): return 0
    async def _migrate_sessions(self, inst, sessions): pass
    async def _force_release_session(self, inst, session,
                                      cause=""): pass
    async def _log_drain_progress(self, iid, remaining,
                                   time_left): pass
    def _generate_endpoint(self, new_id): return ""

class NFLifecycleError(Exception):
    pass

The critical insight that AI tools miss: standard Kubernetes graceful shutdown (preStop hooks with a fixed timeout) is insufficient for telecom. A pod running an SMF instance may have 100,000 active PDU sessions, and killing that pod means 100,000 users lose data connectivity simultaneously. The drain procedure must first deregister from NRF (so no new sessions arrive), then wait for sessions to complete naturally, then force-migrate high-priority sessions (emergency calls), and only then terminate. Claude Code handles this well because it can reason about the cascading effects of pod termination on active signaling sessions. Cursor excels at multi-file lifecycle management when Helm charts, operator code, and NF application code are all indexed together.

Protocol State Machine Implementation

The 5G NAS (Non-Access Stratum) PDU Session Establishment procedure involves a multi-step state machine with timers, guard conditions, and error recovery paths. Getting this wrong means users cannot establish data connectivity. Here is a formal state machine for PDU Session Establishment as specified in TS 24.501:

# 5G NAS PDU Session Establishment State Machine
# Reference: 3GPP TS 24.501 Section 6.4.1

from enum import Enum, auto
from dataclasses import dataclass, field
from typing import Optional, Dict, Callable
import time

class PDUSessionState(Enum):
    PDU_SESSION_INACTIVE = auto()
    PDU_SESSION_ESTABLISHMENT_PENDING = auto()
    PDU_SESSION_ACTIVE = auto()
    PDU_SESSION_MODIFICATION_PENDING = auto()
    PDU_SESSION_RELEASE_PENDING = auto()

class PDUSessionType(Enum):
    IPV4 = 1
    IPV6 = 2
    IPV4V6 = 3
    UNSTRUCTURED = 4
    ETHERNET = 5

class EstablishmentCause(Enum):
    NORMAL = auto()
    EMERGENCY = auto()
    HIGH_PRIORITY = auto()

@dataclass
class PDUSessionContext:
    """PDU Session context maintained at UE."""
    pdu_session_id: int  # 1-15
    pti: int             # Procedure Transaction Identity (1-254)
    session_type: PDUSessionType = PDUSessionType.IPV4V6
    ssc_mode: int = 1    # SSC mode 1, 2, or 3
    dnn: str = ""        # Data Network Name
    s_nssai: Optional[dict] = None  # Single NSSAI

    # Assigned by network
    pdu_address: Optional[str] = None
    authorized_qos_rules: Dict[int, dict] = field(
        default_factory=dict)
    authorized_qos_flow_descriptions: Dict[int, dict] = field(
        default_factory=dict)
    session_ambr: Optional[dict] = None  # DL/UL AMBR

    # State
    state: PDUSessionState = PDUSessionState.PDU_SESSION_INACTIVE

    # 5GSM cause from network
    last_cause: Optional[int] = None

@dataclass
class NASTimer:
    name: str
    duration_ms: int
    started_at: Optional[float] = None
    on_expiry: Optional[Callable] = None
    max_retransmissions: int = 0
    retransmission_count: int = 0

    def start(self):
        self.started_at = time.monotonic()

    def stop(self):
        self.started_at = None
        self.retransmission_count = 0

    def is_expired(self) -> bool:
        if self.started_at is None:
            return False
        return ((time.monotonic() - self.started_at) * 1000
                >= self.duration_ms)

class PDUSessionManager:
    """Manages PDU Session lifecycle per TS 24.501.

    Handles establishment, modification, and release with
    full timer management, retransmission, and error recovery.
    """

    MAX_PDU_SESSIONS = 15

    def __init__(self):
        self.sessions: Dict[int, PDUSessionContext] = {}
        self.timers: Dict[str, NASTimer] = {}
        self._next_pti = 1

        # Backoff timers from network rejection
        self.t3580 = NASTimer("T3580", 16000,
                               max_retransmissions=4)
        self.t3581 = NASTimer("T3581", 16000,
                               max_retransmissions=4)
        self.t3582 = NASTimer("T3582", 16000,
                               max_retransmissions=4)

    def _allocate_pti(self) -> int:
        """Allocate Procedure Transaction Identity (1-254)."""
        pti = self._next_pti
        self._next_pti = (self._next_pti % 254) + 1
        return pti

    def initiate_establishment(
            self, pdu_session_id: int,
            session_type: PDUSessionType,
            dnn: str,
            s_nssai: Optional[dict] = None,
            cause: EstablishmentCause = EstablishmentCause.NORMAL
    ) -> PDUSessionContext:
        """UE-requested PDU Session Establishment (TS 24.501 6.4.1.2).

        Preconditions:
        - UE in 5GMM-REGISTERED state
        - PDU session ID not already active
        - Not barred by T3396/T3584 backoff timer

        State: INACTIVE -> ESTABLISHMENT_PENDING
        """
        # Guard: session ID already in use
        if pdu_session_id in self.sessions:
            existing = self.sessions[pdu_session_id]
            if existing.state != \
                    PDUSessionState.PDU_SESSION_INACTIVE:
                raise SessionError(
                    f"PDU session {pdu_session_id} already "
                    f"in state {existing.state}")

        # Guard: max sessions
        active = sum(1 for s in self.sessions.values()
                     if s.state != \
                     PDUSessionState.PDU_SESSION_INACTIVE)
        if active >= self.MAX_PDU_SESSIONS:
            raise SessionError(
                "Maximum PDU sessions reached")

        # Create session context
        pti = self._allocate_pti()
        session = PDUSessionContext(
            pdu_session_id=pdu_session_id,
            pti=pti,
            session_type=session_type,
            dnn=dnn,
            s_nssai=s_nssai,
        )

        # Build PDU SESSION ESTABLISHMENT REQUEST
        msg = self._build_establishment_request(session)

        # Transition: INACTIVE -> ESTABLISHMENT_PENDING
        session.state = \
            PDUSessionState.PDU_SESSION_ESTABLISHMENT_PENDING
        self.sessions[pdu_session_id] = session

        # Start T3580 (guards establishment procedure)
        timer_key = f"T3580_{pdu_session_id}"
        self.timers[timer_key] = NASTimer(
            "T3580", 16000, max_retransmissions=4,
            on_expiry=lambda: self._on_t3580_expiry(
                pdu_session_id))
        self.timers[timer_key].start()

        # Send via NAS transport (encapsulated in UL NAS TRANSPORT)
        self._send_nas_sm_message(
            pdu_session_id, pti, msg)

        return session

    def handle_establishment_accept(
            self, pdu_session_id: int, msg: dict):
        """Process PDU SESSION ESTABLISHMENT ACCEPT (TS 24.501 6.4.1.3).

        State: ESTABLISHMENT_PENDING -> ACTIVE
        """
        session = self.sessions.get(pdu_session_id)
        if not session:
            return

        if session.state != \
                PDUSessionState.PDU_SESSION_ESTABLISHMENT_PENDING:
            # Unexpected accept in current state
            return

        # Stop T3580
        timer_key = f"T3580_{pdu_session_id}"
        if timer_key in self.timers:
            self.timers[timer_key].stop()

        # Apply authorized parameters from network
        session.session_type = PDUSessionType(
            msg.get("selected_pdu_session_type",
                     session.session_type.value))
        session.ssc_mode = msg.get("selected_ssc_mode",
                                    session.ssc_mode)

        # PDU address assignment
        if "pdu_address" in msg:
            session.pdu_address = msg["pdu_address"]

        # QoS rules (mandatory IE)
        if "authorized_qos_rules" in msg:
            session.authorized_qos_rules = \
                msg["authorized_qos_rules"]

        # Session AMBR (mandatory IE)
        if "session_ambr" in msg:
            session.session_ambr = msg["session_ambr"]

        # QoS flow descriptions (optional)
        if "authorized_qos_flow_descriptions" in msg:
            session.authorized_qos_flow_descriptions = \
                msg["authorized_qos_flow_descriptions"]

        # Transition: ESTABLISHMENT_PENDING -> ACTIVE
        session.state = PDUSessionState.PDU_SESSION_ACTIVE

        # Notify upper layers (IP stack can now use this PDU session)
        self._notify_data_plane(pdu_session_id, "ACTIVATED",
                                session.pdu_address)

    def handle_establishment_reject(
            self, pdu_session_id: int, msg: dict):
        """Process PDU SESSION ESTABLISHMENT REJECT (TS 24.501 6.4.1.4).

        State: ESTABLISHMENT_PENDING -> INACTIVE

        Must handle cause values:
        - #26: Insufficient resources
        - #27: Missing or unknown DNN
        - #28: Unknown PDU session type
        - #29: User authentication failed
        - #31: Request rejected, unspecified
        - #33: Requested service option not subscribed
        - #36: Regular deactivation
        - #43: Invalid PDU session identity
        """
        session = self.sessions.get(pdu_session_id)
        if not session:
            return

        # Stop T3580
        timer_key = f"T3580_{pdu_session_id}"
        if timer_key in self.timers:
            self.timers[timer_key].stop()

        cause = msg.get("5gsm_cause", 31)
        session.last_cause = cause

        # Handle backoff timer if present
        if "back_off_timer" in msg:
            backoff_ms = msg["back_off_timer"] * 1000
            backoff_key = f"backoff_{pdu_session_id}_{session.dnn}"
            self.timers[backoff_key] = NASTimer(
                "T3584", backoff_ms)
            self.timers[backoff_key].start()

        # Transition: ESTABLISHMENT_PENDING -> INACTIVE
        session.state = PDUSessionState.PDU_SESSION_INACTIVE

        # Notify upper layers
        self._notify_data_plane(pdu_session_id, "REJECTED",
                                cause=cause)

    def _on_t3580_expiry(self, pdu_session_id: int):
        """T3580 expired during PDU Session Establishment.

        TS 24.501 Section 6.4.1.6:
        If T3580 expires, retransmit the request up to 4 times.
        After max retransmissions, abort and move to INACTIVE.
        """
        timer_key = f"T3580_{pdu_session_id}"
        timer = self.timers.get(timer_key)
        session = self.sessions.get(pdu_session_id)

        if not timer or not session:
            return

        if timer.retransmission_count < \
                timer.max_retransmissions:
            timer.retransmission_count += 1
            timer.start()  # Restart timer

            # Retransmit the establishment request
            msg = self._build_establishment_request(session)
            self._send_nas_sm_message(
                pdu_session_id, session.pti, msg)
        else:
            # Max retransmissions reached, abort
            timer.stop()
            session.state = \
                PDUSessionState.PDU_SESSION_INACTIVE
            self._notify_data_plane(
                pdu_session_id, "ESTABLISHMENT_FAILED",
                cause="T3580_MAX_RETRANSMISSIONS")

    def initiate_release(self, pdu_session_id: int,
                         cause: int = 36):
        """UE-requested PDU Session Release (TS 24.501 6.4.3.2).

        State: ACTIVE -> RELEASE_PENDING
        """
        session = self.sessions.get(pdu_session_id)
        if not session or \
                session.state != PDUSessionState.PDU_SESSION_ACTIVE:
            return

        pti = self._allocate_pti()
        session.pti = pti

        # Build release request
        msg = {"message_type": "PDU_SESSION_RELEASE_REQUEST",
               "5gsm_cause": cause}

        session.state = \
            PDUSessionState.PDU_SESSION_RELEASE_PENDING

        # Start T3582
        timer_key = f"T3582_{pdu_session_id}"
        self.timers[timer_key] = NASTimer(
            "T3582", 16000, max_retransmissions=4,
            on_expiry=lambda: self._on_t3582_expiry(
                pdu_session_id))
        self.timers[timer_key].start()

        self._send_nas_sm_message(
            pdu_session_id, pti, msg)

    def handle_release_command(self, pdu_session_id: int,
                                msg: dict):
        """Network-initiated release (TS 24.501 6.4.3.3).

        State: any -> INACTIVE
        """
        session = self.sessions.get(pdu_session_id)
        if not session:
            return

        # Stop any running timers for this session
        for key in list(self.timers.keys()):
            if str(pdu_session_id) in key:
                self.timers[key].stop()

        session.last_cause = msg.get("5gsm_cause", 36)
        session.state = PDUSessionState.PDU_SESSION_INACTIVE

        # Deactivate data plane
        self._notify_data_plane(pdu_session_id, "RELEASED",
                                cause=session.last_cause)

        # Send PDU SESSION RELEASE COMPLETE
        complete = {"message_type":
                    "PDU_SESSION_RELEASE_COMPLETE"}
        self._send_nas_sm_message(
            pdu_session_id, session.pti, complete)

    def _on_t3582_expiry(self, pdu_session_id: int):
        """T3582 expired during release."""
        timer_key = f"T3582_{pdu_session_id}"
        timer = self.timers.get(timer_key)
        session = self.sessions.get(pdu_session_id)
        if not timer or not session:
            return
        if timer.retransmission_count < \
                timer.max_retransmissions:
            timer.retransmission_count += 1
            timer.start()
            msg = {"message_type":
                   "PDU_SESSION_RELEASE_REQUEST",
                   "5gsm_cause": 36}
            self._send_nas_sm_message(
                pdu_session_id, session.pti, msg)
        else:
            timer.stop()
            session.state = \
                PDUSessionState.PDU_SESSION_INACTIVE
            self._notify_data_plane(
                pdu_session_id, "RELEASE_TIMEOUT")

    # Stubs
    def _build_establishment_request(self, session): return {}
    def _send_nas_sm_message(self, sid, pti, msg): pass
    def _notify_data_plane(self, sid, event, **kwargs): pass

class SessionError(Exception):
    pass

The state machine correctness requirements here are absolute. Every state transition has preconditions (guard conditions that must be true), actions (messages sent, timers started/stopped, context updated), and post-conditions (the new state and its invariants). AI tools commonly generate state machines that handle the happy path (establishment succeeds) but miss the failure paths: T3580 expiry and retransmission, rejection with backoff timer, network-initiated release during establishment. Claude Code is strongest here because it can reason through the procedural specification text and identify the complete set of state transitions. Cursor helps when the state machine code, timer management, and message encoding are in separate files that need cross-referencing. Copilot generates basic enum-based state machines but consistently misses timer management and retransmission logic.

Real-Time Signaling Pipeline

The user plane in 5G is built on GTP-U tunnels between gNB and UPF, managed by PFCP sessions between SMF and UPF. The UPF must process millions of packets per second with per-packet QoS enforcement, charging, and lawful intercept — all at line rate. Here is a GTP-U tunnel manager with PFCP session integration:

# GTP-U Tunnel Manager with PFCP Session Integration
# Reference: TS 29.281 (GTP-U), TS 29.244 (PFCP)

import struct
import socket
import asyncio
from dataclasses import dataclass, field
from typing import Dict, Optional, Tuple, List
from enum import IntEnum
import time

class GTPUMessageType(IntEnum):
    ECHO_REQUEST = 1
    ECHO_RESPONSE = 2
    ERROR_INDICATION = 26
    SUPPORTED_EXTENSION_HEADERS = 31
    END_MARKER = 254
    G_PDU = 255   # User data

class PFCPMessageType(IntEnum):
    HEARTBEAT_REQUEST = 1
    HEARTBEAT_RESPONSE = 2
    SESSION_ESTABLISHMENT_REQUEST = 50
    SESSION_ESTABLISHMENT_RESPONSE = 51
    SESSION_MODIFICATION_REQUEST = 52
    SESSION_MODIFICATION_RESPONSE = 53
    SESSION_DELETION_REQUEST = 54
    SESSION_DELETION_RESPONSE = 55
    SESSION_REPORT_REQUEST = 56
    SESSION_REPORT_RESPONSE = 57

@dataclass
class GTPUHeader:
    """GTP-U header (TS 29.281 Section 5.1).

    Flags: Version(3b)=1, PT(1b)=1, E/S/PN bits
    Message Type: 255 for G-PDU (user data)
    Length: payload length excluding first 8 mandatory bytes
    TEID: Tunnel Endpoint Identifier (32-bit)
    """
    version: int = 1
    pt: int = 1          # Protocol Type (1=GTP)
    e_flag: int = 0      # Extension header flag
    s_flag: int = 0      # Sequence number flag
    pn_flag: int = 0     # N-PDU number flag
    message_type: int = GTPUMessageType.G_PDU
    length: int = 0
    teid: int = 0
    sequence_number: Optional[int] = None
    n_pdu_number: Optional[int] = None
    extension_headers: List[bytes] = field(
        default_factory=list)

    def encode(self) -> bytes:
        """Encode GTP-U header to bytes."""
        flags = ((self.version & 0x07) << 5) | \
                ((self.pt & 0x01) << 4) | \
                ((self.e_flag & 0x01) << 2) | \
                ((self.s_flag & 0x01) << 1) | \
                (self.pn_flag & 0x01)

        header = struct.pack("!BBHI",
                            flags,
                            self.message_type,
                            self.length,
                            self.teid)

        # Optional fields present if any of E/S/PN set
        if self.e_flag or self.s_flag or self.pn_flag:
            seq = self.sequence_number or 0
            npdu = self.n_pdu_number or 0
            ext_type = (self.extension_headers[0][0]
                       if self.extension_headers else 0)
            header += struct.pack("!HBB", seq, npdu, ext_type)

            for ext in self.extension_headers:
                header += ext

        return header

    @classmethod
    def decode(cls, data: bytes) -> Tuple["GTPUHeader", int]:
        """Decode GTP-U header from bytes. Returns (header, offset)."""
        if len(data) < 8:
            raise ValueError("GTP-U header too short")

        flags, msg_type, length, teid = struct.unpack(
            "!BBHI", data[:8])

        header = cls()
        header.version = (flags >> 5) & 0x07
        header.pt = (flags >> 4) & 0x01
        header.e_flag = (flags >> 2) & 0x01
        header.s_flag = (flags >> 1) & 0x01
        header.pn_flag = flags & 0x01
        header.message_type = msg_type
        header.length = length
        header.teid = teid

        offset = 8

        if header.e_flag or header.s_flag or header.pn_flag:
            if len(data) < 12:
                raise ValueError(
                    "GTP-U optional fields truncated")
            seq, npdu, ext_type = struct.unpack(
                "!HBB", data[8:12])
            header.sequence_number = seq
            header.n_pdu_number = npdu
            offset = 12

            # Parse extension headers
            while ext_type != 0:
                if offset >= len(data):
                    break
                ext_len = data[offset] * 4  # length in 4-byte units
                if offset + ext_len > len(data):
                    break
                ext_data = data[offset:offset + ext_len]
                header.extension_headers.append(ext_data)
                ext_type = data[offset + ext_len - 1]
                offset += ext_len

        return header, offset

@dataclass
class TunnelEndpoint:
    """GTP-U tunnel endpoint definition."""
    teid: int
    remote_addr: str
    remote_port: int = 2152  # Standard GTP-U port
    local_teid: int = 0
    qfi: int = 0             # QoS Flow Identifier

    # Traffic counters
    rx_packets: int = 0
    rx_bytes: int = 0
    tx_packets: int = 0
    tx_bytes: int = 0

    # Sequence numbering
    tx_sequence: int = 0
    rx_expected_sequence: int = 0

@dataclass
class PFCPSession:
    """PFCP session context (TS 29.244)."""
    seid_local: int          # Local Session Endpoint ID
    seid_remote: int = 0     # Remote SEID (from SMF)
    pdr_list: Dict[int, dict] = field(default_factory=dict)
    far_list: Dict[int, dict] = field(default_factory=dict)
    qer_list: Dict[int, dict] = field(default_factory=dict)
    urr_list: Dict[int, dict] = field(default_factory=dict)

    # PDR: Packet Detection Rule (match criteria)
    # FAR: Forwarding Action Rule (what to do with matched packet)
    # QER: QoS Enforcement Rule (rate limiting)
    # URR: Usage Reporting Rule (charging)

class GTPUTunnelManager:
    """Manages GTP-U tunnels with PFCP session integration.

    Handles tunnel creation/deletion, packet encapsulation/
    decapsulation, QoS marking, and usage metering.
    """

    def __init__(self, local_addr: str, gtpu_port: int = 2152):
        self.local_addr = local_addr
        self.gtpu_port = gtpu_port
        self.tunnels: Dict[int, TunnelEndpoint] = {}  # by local TEID
        self.pfcp_sessions: Dict[int, PFCPSession] = {}
        self._next_teid = 1
        self._next_seid = 1
        self.socket: Optional[socket.socket] = None

    def allocate_teid(self) -> int:
        """Allocate unique local TEID."""
        teid = self._next_teid
        self._next_teid += 1
        # TEID 0 is reserved (TS 29.281 Section 5.1)
        if self._next_teid == 0:
            self._next_teid = 1
        return teid

    def create_tunnel(self, remote_addr: str,
                      remote_teid: int,
                      qfi: int = 0) -> TunnelEndpoint:
        """Create a new GTP-U tunnel endpoint."""
        local_teid = self.allocate_teid()

        tunnel = TunnelEndpoint(
            teid=remote_teid,
            remote_addr=remote_addr,
            local_teid=local_teid,
            qfi=qfi,
        )
        self.tunnels[local_teid] = tunnel
        return tunnel

    def encapsulate(self, tunnel: TunnelEndpoint,
                    payload: bytes,
                    qfi: Optional[int] = None) -> bytes:
        """Encapsulate IP packet in GTP-U.

        Adds GTP-U header with remote TEID, optional QFI
        extension header for 5G QoS marking.
        """
        use_qfi = qfi if qfi is not None else tunnel.qfi

        header = GTPUHeader(
            message_type=GTPUMessageType.G_PDU,
            teid=tunnel.teid,  # Remote TEID
        )

        # Add PDU Session Container extension header for 5G
        # (TS 38.415 Section 5.5.3)
        if use_qfi > 0:
            header.e_flag = 1
            # Extension header type 0x85 = PDU Session Container
            # DL PDU SESSION INFORMATION (type=0, QFI)
            ext = struct.pack("!BBBB",
                             1,          # length in 4-byte units
                             (0 << 4) | (use_qfi & 0x3F),
                             0,          # padding
                             0)          # next extension type
            header.extension_headers = [
                bytes([0x85]) + ext]

        header.length = len(payload)
        if header.e_flag or header.s_flag or header.pn_flag:
            header.length += 4  # optional header fields
            for ext in header.extension_headers:
                header.length += len(ext)

        # Update counters
        tunnel.tx_packets += 1
        tunnel.tx_bytes += len(payload)
        tunnel.tx_sequence = \
            (tunnel.tx_sequence + 1) & 0xFFFF

        return header.encode() + payload

    def decapsulate(self, data: bytes) \
            -> Optional[Tuple[TunnelEndpoint, bytes, int]]:
        """Decapsulate GTP-U packet.

        Returns (tunnel, payload, qfi) or None if unknown TEID.
        """
        header, offset = GTPUHeader.decode(data)

        if header.message_type == GTPUMessageType.ECHO_REQUEST:
            self._handle_echo_request(data)
            return None

        if header.message_type != GTPUMessageType.G_PDU:
            return None

        # Look up tunnel by local TEID
        local_teid = header.teid
        tunnel = self.tunnels.get(local_teid)
        if not tunnel:
            # Unknown TEID - send Error Indication
            self._send_error_indication(
                local_teid, data)
            return None

        # Extract QFI from PDU Session Container if present
        qfi = 0
        if header.e_flag and header.extension_headers:
            for ext in header.extension_headers:
                if len(ext) > 1 and ext[0] == 0x85:
                    qfi = ext[1] & 0x3F

        payload = data[offset:]

        # Update counters
        tunnel.rx_packets += 1
        tunnel.rx_bytes += len(payload)

        return tunnel, payload, qfi

    def apply_pfcp_rules(self, session: PFCPSession,
                         packet: bytes, direction: str) \
            -> Optional[Tuple[str, dict]]:
        """Apply PFCP PDR/FAR rules to a packet.

        Matches packet against PDRs, applies the associated
        FAR action (forward, duplicate, buffer, drop).
        """
        # Match against PDRs in priority order
        matched_pdr = None
        for pdr_id, pdr in sorted(
                session.pdr_list.items(),
                key=lambda x: x[1].get("precedence", 255)):
            if self._packet_matches_pdr(
                    packet, pdr, direction):
                matched_pdr = pdr
                break

        if not matched_pdr:
            return None  # No matching rule, drop

        # Get associated FAR
        far_id = matched_pdr.get("far_id")
        if far_id is None or far_id not in session.far_list:
            return None

        far = session.far_list[far_id]
        action = far.get("apply_action", "drop")

        # Apply QER if present
        qer_id = matched_pdr.get("qer_id")
        if qer_id and qer_id in session.qer_list:
            qer = session.qer_list[qer_id]
            if not self._check_qos_enforcement(
                    packet, qer, direction):
                return ("drop", {"reason": "QER_EXCEEDED"})

        # Apply URR for usage reporting
        urr_id = matched_pdr.get("urr_id")
        if urr_id and urr_id in session.urr_list:
            self._update_usage_report(
                session.urr_list[urr_id],
                len(packet), direction)

        return (action, far)

    def _packet_matches_pdr(self, packet: bytes,
                            pdr: dict,
                            direction: str) -> bool:
        """Check if packet matches PDR criteria."""
        pdi = pdr.get("pdi", {})

        # Source interface check
        if pdi.get("source_interface") and \
                pdi["source_interface"] != direction:
            return False

        # TEID match (for GTP-U encapsulated traffic)
        if "local_f_teid" in pdi:
            # Already matched by tunnel lookup
            pass

        # UE IP address match
        if "ue_ip_address" in pdi and len(packet) >= 20:
            version = (packet[0] >> 4) & 0x0F
            if version == 4:
                if direction == "uplink":
                    src_ip = socket.inet_ntoa(packet[12:16])
                    if src_ip != pdi["ue_ip_address"]:
                        return False
                else:
                    dst_ip = socket.inet_ntoa(packet[16:20])
                    if dst_ip != pdi["ue_ip_address"]:
                        return False

        # SDF filter (5-tuple matching)
        if "sdf_filter" in pdi:
            if not self._match_sdf_filter(
                    packet, pdi["sdf_filter"]):
                return False

        return True

    def _check_qos_enforcement(self, packet: bytes,
                                qer: dict,
                                direction: str) -> bool:
        """Enforce QoS rate limits (token bucket)."""
        key = "dl" if direction == "downlink" else "ul"
        mbr = qer.get(f"maximum_bitrate_{key}", 0)
        if mbr == 0:
            return True  # No limit

        # Token bucket algorithm
        bucket = qer.setdefault(f"_bucket_{key}", {
            "tokens": mbr,
            "last_update": time.monotonic(),
            "rate": mbr,  # bits per second
        })

        now = time.monotonic()
        elapsed = now - bucket["last_update"]
        bucket["tokens"] = min(
            bucket["rate"],
            bucket["tokens"] + elapsed * bucket["rate"])
        bucket["last_update"] = now

        packet_bits = len(packet) * 8
        if bucket["tokens"] >= packet_bits:
            bucket["tokens"] -= packet_bits
            return True
        return False

    def _update_usage_report(self, urr: dict,
                              packet_size: int,
                              direction: str):
        """Update usage reporting counters."""
        key = f"volume_{direction}"
        urr[key] = urr.get(key, 0) + packet_size
        urr[f"packets_{direction}"] = \
            urr.get(f"packets_{direction}", 0) + 1

    def _handle_echo_request(self, data): pass
    def _send_error_indication(self, teid, data): pass
    def _match_sdf_filter(self, packet, sdf): return True

The critical detail AI tools miss in GTP-U: TEID 0 is reserved and must never be used for user data tunnels (TS 29.281 Section 5.1), the PDU Session Container extension header (type 0x85) is mandatory for 5G to carry QFI, and the Error Indication message must be sent back when a G-PDU arrives with an unknown TEID rather than silently dropping it. Claude Code handles the GTP-U header encoding correctly because it can reason about the bit-level flag encoding (version, PT, E/S/PN in a single byte). Copilot generates reasonable struct.pack/unpack code for binary protocols but misses the extension header chaining mechanism. Cursor helps most when GTP-U handling, PFCP session management, and QoS enforcement code are in separate modules that reference each other.

3GPP Conformance Testing

Conformance testing in telecommunications is fundamentally different from unit testing in application development. Test cases are derived directly from 3GPP specification procedures, and each test verifies a specific protocol behavior with specific message sequences, timer values, and state transitions. ETSI defines conformance test suites in TTCN-3 (Testing and Test Control Notation), and even when implementing custom test frameworks, the test structure must mirror the 3GPP specification organization:

# 3GPP-style Conformance Test Framework
# Protocol conformance testing for NR RRC procedures

from dataclasses import dataclass, field
from typing import List, Optional, Dict, Callable, Any
from enum import Enum, auto
import time

class TestVerdict(Enum):
    PASS = auto()
    FAIL = auto()
    INCONCLUSIVE = auto()
    ERROR = auto()
    NONE = auto()

class MessageDirection(Enum):
    UE_TO_NETWORK = auto()  # Uplink
    NETWORK_TO_UE = auto()  # Downlink

@dataclass
class ProtocolMessage:
    """Represents a protocol message in a test sequence."""
    direction: MessageDirection
    message_type: str
    ies: Dict[str, Any] = field(default_factory=dict)
    timestamp: float = 0.0

@dataclass
class TestStep:
    """A single step in a conformance test case."""
    step_number: int
    description: str
    action: Callable
    expected_verdict: TestVerdict = TestVerdict.PASS
    timeout_ms: int = 5000

@dataclass
class ConformanceTestCase:
    """3GPP-style conformance test case.

    Structure mirrors ETSI TS 38.523 (5GS; UE conformance
    specification; Part 1: Protocol).
    """
    test_id: str         # e.g., "8.1.1.1.1"
    title: str
    spec_reference: str  # e.g., "TS 38.331 Section 5.3.3"
    purpose: str
    preconditions: List[str] = field(default_factory=list)
    steps: List[TestStep] = field(default_factory=list)
    verdict: TestVerdict = TestVerdict.NONE
    messages_captured: List[ProtocolMessage] = field(
        default_factory=list)

class NRRRCConformanceSuite:
    """Conformance test suite for NR RRC procedures.

    Test cases derived from ETSI TS 38.523-1.
    """

    def __init__(self, ue_under_test, network_simulator):
        self.ue = ue_under_test
        self.network = network_simulator
        self.test_cases: List[ConformanceTestCase] = []
        self._build_test_cases()

    def _build_test_cases(self):
        """Build conformance test cases from spec."""

        # TC 8.1.1.1.1: RRC Connection Setup - Normal
        tc = ConformanceTestCase(
            test_id="8.1.1.1.1",
            title="RRC connection setup - success",
            spec_reference="TS 38.331 Section 5.3.3",
            purpose="Verify UE correctly performs RRC connection "
                    "setup procedure when network accepts the "
                    "request.",
            preconditions=[
                "UE is in RRC_IDLE state",
                "UE is camped on a suitable NR cell",
                "NAS has triggered connection establishment",
            ],
        )

        tc.steps = [
            TestStep(1,
                "UE sends RRCSetupRequest on SRB0 (CCCH)",
                lambda: self._verify_rrc_setup_request()),
            TestStep(2,
                "Network sends RRCSetup",
                lambda: self._send_rrc_setup()),
            TestStep(3,
                "Verify UE transitions to RRC_CONNECTED",
                lambda: self._verify_ue_state("RRC_CONNECTED")),
            TestStep(4,
                "Verify UE sends RRCSetupComplete on SRB1",
                lambda: self._verify_rrc_setup_complete()),
            TestStep(5,
                "Verify T300 is stopped",
                lambda: self._verify_timer_stopped("T300")),
            TestStep(6,
                "Verify SRB1 is established",
                lambda: self._verify_srb_established(1)),
        ]
        self.test_cases.append(tc)

        # TC 8.1.1.2.1: RRC Setup - T300 Expiry
        tc_t300 = ConformanceTestCase(
            test_id="8.1.1.2.1",
            title="RRC connection setup - T300 expiry and retry",
            spec_reference="TS 38.331 Section 5.3.3.8",
            purpose="Verify UE retransmits RRCSetupRequest on "
                    "T300 expiry up to max retransmissions, then "
                    "informs NAS of failure.",
            preconditions=[
                "UE is in RRC_IDLE state",
                "UE is camped on a suitable NR cell",
                "Network is configured to NOT respond to "
                "RRCSetupRequest",
            ],
        )

        tc_t300.steps = [
            TestStep(1,
                "UE sends RRCSetupRequest on SRB0",
                lambda: self._verify_rrc_setup_request()),
            TestStep(2,
                "Wait for T300 expiry (network does not respond)",
                lambda: self._wait_for_timer_expiry("T300"),
                timeout_ms=2000),
            TestStep(3,
                "Verify UE retransmits RRCSetupRequest",
                lambda: self._verify_retransmission(
                    "RRCSetupRequest"),
                timeout_ms=2000),
            TestStep(4,
                "Repeat until max retransmissions (4x)",
                lambda: self._verify_max_retransmissions(
                    "RRCSetupRequest", 4),
                timeout_ms=20000),
            TestStep(5,
                "Verify UE informs NAS of setup failure",
                lambda: self._verify_nas_notification(
                    "RRC_SETUP_FAILURE")),
            TestStep(6,
                "Verify UE remains in RRC_IDLE",
                lambda: self._verify_ue_state("RRC_IDLE")),
        ]
        self.test_cases.append(tc_t300)

        # TC 8.1.3.1.1: RRC Reconfiguration - Handover
        tc_ho = ConformanceTestCase(
            test_id="8.1.3.1.1",
            title="RRC reconfiguration - intra-NR handover",
            spec_reference="TS 38.331 Section 5.3.5.4",
            purpose="Verify UE correctly executes handover via "
                    "RRCReconfiguration with mobilityControlInfo.",
            preconditions=[
                "UE is in RRC_CONNECTED state",
                "Active DRB established",
                "Measurement reporting configured",
            ],
        )

        tc_ho.steps = [
            TestStep(1,
                "Configure measurement (Event A3, offset 3dB)",
                lambda: self._configure_measurement_a3(
                    offset_db=3.0)),
            TestStep(2,
                "Simulate neighbour cell becoming stronger",
                lambda: self._set_cell_rsrp(
                    target_pci=2, rsrp=-80,
                    serving_rsrp=-90)),
            TestStep(3,
                "Verify UE sends MeasurementReport with A3 event",
                lambda: self._verify_measurement_report(
                    event="A3", pci=2),
                timeout_ms=5000),
            TestStep(4,
                "Send RRCReconfiguration with mobilityControlInfo",
                lambda: self._send_handover_command(
                    target_pci=2)),
            TestStep(5,
                "Verify T304 is started",
                lambda: self._verify_timer_running("T304")),
            TestStep(6,
                "Verify UE performs RACH on target cell",
                lambda: self._verify_rach_on_target(pci=2),
                timeout_ms=1000),
            TestStep(7,
                "Verify UE sends RRCReconfigurationComplete",
                lambda: self._verify_message_sent(
                    "RRCReconfigurationComplete")),
            TestStep(8,
                "Verify T304 is stopped",
                lambda: self._verify_timer_stopped("T304")),
            TestStep(9,
                "Verify data plane continuity on new cell",
                lambda: self._verify_data_continuity()),
        ]
        self.test_cases.append(tc_ho)

    def run_test_case(self, test_id: str) -> TestVerdict:
        """Execute a single conformance test case."""
        tc = next((t for t in self.test_cases
                   if t.test_id == test_id), None)
        if not tc:
            raise ValueError(f"Test case {test_id} not found")

        # Verify preconditions
        for precond in tc.preconditions:
            if not self._check_precondition(precond):
                tc.verdict = TestVerdict.INCONCLUSIVE
                return tc.verdict

        # Execute steps
        for step in tc.steps:
            try:
                start = time.monotonic()
                result = step.action()
                elapsed_ms = (time.monotonic() - start) * 1000

                if elapsed_ms > step.timeout_ms:
                    tc.verdict = TestVerdict.FAIL
                    self._log_failure(tc, step,
                        f"Step timed out ({elapsed_ms:.0f}ms "
                        f"> {step.timeout_ms}ms)")
                    return tc.verdict

                if result is False:
                    tc.verdict = TestVerdict.FAIL
                    self._log_failure(tc, step,
                        "Step verification failed")
                    return tc.verdict

            except Exception as e:
                tc.verdict = TestVerdict.ERROR
                self._log_failure(tc, step, str(e))
                return tc.verdict

        tc.verdict = TestVerdict.PASS
        return tc.verdict

    def run_suite(self) -> Dict[str, TestVerdict]:
        """Run all test cases and return results."""
        results = {}
        for tc in self.test_cases:
            results[tc.test_id] = self.run_test_case(tc.test_id)
        return results

    # Verification methods (implementation depends on test harness)
    def _verify_rrc_setup_request(self): return True
    def _send_rrc_setup(self): return True
    def _verify_ue_state(self, state): return True
    def _verify_rrc_setup_complete(self): return True
    def _verify_timer_stopped(self, name): return True
    def _verify_timer_running(self, name): return True
    def _verify_srb_established(self, srb_id): return True
    def _wait_for_timer_expiry(self, name): return True
    def _verify_retransmission(self, msg_type): return True
    def _verify_max_retransmissions(self, msg, count): return True
    def _verify_nas_notification(self, event): return True
    def _configure_measurement_a3(self, offset_db): return True
    def _set_cell_rsrp(self, **kwargs): return True
    def _verify_measurement_report(self, **kwargs): return True
    def _send_handover_command(self, target_pci): return True
    def _verify_rach_on_target(self, pci): return True
    def _verify_message_sent(self, msg_type): return True
    def _verify_data_continuity(self): return True
    def _check_precondition(self, precond): return True
    def _log_failure(self, tc, step, msg): pass

Conformance test structure is fundamentally different from unit testing: each test case maps to a specific 3GPP specification section, preconditions establish the protocol state before the test begins, and steps follow the exact message exchange sequence defined in the specification. AI tools that generate generic pytest-style test cases miss the protocol-level structure entirely — there is no “mock” for a radio link failure, and asserting on function return values does not verify that the correct RRC message was sent on the correct SRB with the correct ASN.1-encoded IEs. Claude Code understands the conformance test structure because it can reference the procedural descriptions in 3GPP specifications. Cursor helps when test cases, protocol handlers, and message encoders are in the same project, allowing cross-file verification.

Network Telemetry Collection

Telecom network telemetry requires domain-specific KPI computation from raw performance counters, alarm correlation across network layers, and YANG model-driven data collection. A telemetry system that does not understand the relationship between radio-level counters (RRC setup attempts/completions) and the KPIs operators track (CSSR, CDR) provides raw data without operational insight:

# Telecom Network Telemetry: KPI Engine + Alarm Correlator
# YANG-driven collection, 3GPP KPI computation, X.733 alarms

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple, Set
from enum import Enum, auto
from datetime import datetime, timedelta
import re
import math

class AlarmSeverity(Enum):
    CRITICAL = 1
    MAJOR = 2
    MINOR = 3
    WARNING = 4
    INDETERMINATE = 5
    CLEARED = 6

class AlarmEventType(Enum):
    COMMUNICATIONS = auto()
    PROCESSING = auto()
    ENVIRONMENT = auto()
    QUALITY_OF_SERVICE = auto()
    EQUIPMENT = auto()

@dataclass
class X733Alarm:
    """ITU-T X.733 structured alarm."""
    alarm_id: str
    managed_object_class: str   # e.g., "NRCellDU", "GnbDuFunction"
    managed_object_instance: str  # DN path
    event_type: AlarmEventType
    probable_cause: str          # e.g., "threshold-crossed"
    perceived_severity: AlarmSeverity
    specific_problem: str
    additional_text: str
    event_time: datetime
    notification_id: int = 0
    correlated_notifications: List[int] = field(
        default_factory=list)
    root_cause_alarm_id: Optional[str] = None

@dataclass
class PerformanceCounter:
    """Raw performance counter from a network element."""
    counter_name: str      # 3GPP-defined counter name
    counter_value: float
    collection_time: datetime
    granularity_period_s: int = 900  # Typically 15 min
    managed_element: str = ""
    cell_id: str = ""

class TelecomKPIEngine:
    """Computes telecom KPIs from raw 3GPP performance counters.

    KPI definitions from 3GPP TS 28.552 (5G NR Performance
    Measurements) and TS 32.450 (KPI definitions).
    """

    def __init__(self):
        self.counter_store: Dict[str, List[PerformanceCounter]] = {}
        self.kpi_history: Dict[str, List[Tuple[datetime, float]]] = {}

    def ingest_counters(self, counters: List[PerformanceCounter]):
        """Ingest raw performance counters."""
        for counter in counters:
            key = (f"{counter.managed_element}:"
                   f"{counter.cell_id}:"
                   f"{counter.counter_name}")
            if key not in self.counter_store:
                self.counter_store[key] = []
            self.counter_store[key].append(counter)

    def compute_cssr(self, cell_id: str,
                     period_start: datetime,
                     period_end: datetime) -> Optional[float]:
        """Call Setup Success Rate (CSSR).

        CSSR = (RRC.ConnEstabSucc / RRC.ConnEstabAtt) * 100

        Counters (TS 28.552):
        - RRC.ConnEstabAtt: Total RRC connection setup attempts
        - RRC.ConnEstabSucc: Successful RRC setups (per cause)

        Target: >= 99.0%
        """
        attempts = self._sum_counter(
            cell_id, "RRC.ConnEstabAtt",
            period_start, period_end)
        successes = self._sum_counter(
            cell_id, "RRC.ConnEstabSucc",
            period_start, period_end)

        if attempts == 0:
            return None  # No data

        cssr = (successes / attempts) * 100.0
        self._store_kpi(f"{cell_id}:CSSR",
                       period_end, cssr)
        return cssr

    def compute_cdr(self, cell_id: str,
                    period_start: datetime,
                    period_end: datetime) -> Optional[float]:
        """Call Drop Rate (CDR).

        CDR = (RRC.ConnRelAbnormal / RRC.ConnMean) * 100

        Counters:
        - RRC.ConnRelAbnormal: Abnormal RRC releases
          (radio link failure, handover failure, etc.)
        - RRC.ConnMean: Mean number of active RRC connections
          during the measurement period

        Target: <= 1.0%
        """
        abnormal_releases = self._sum_counter(
            cell_id, "RRC.ConnRel.Abnormal",
            period_start, period_end)
        mean_connections = self._avg_counter(
            cell_id, "RRC.ConnMean",
            period_start, period_end)

        if mean_connections == 0:
            return None

        cdr = (abnormal_releases / mean_connections) * 100.0
        self._store_kpi(f"{cell_id}:CDR", period_end, cdr)
        return cdr

    def compute_hosr(self, cell_id: str,
                     period_start: datetime,
                     period_end: datetime) -> Optional[float]:
        """Handover Success Rate (HOSR).

        HOSR = (HO.ExeSucc / HO.ExeAtt) * 100

        Counters (TS 28.552):
        - HO.ExeAtt: Handover execution attempts
          (intra-freq + inter-freq + inter-RAT)
        - HO.ExeSucc: Successful handover executions

        Target: >= 98.0%
        """
        ho_att = self._sum_counter(
            cell_id, "HO.ExeAtt",
            period_start, period_end)
        ho_succ = self._sum_counter(
            cell_id, "HO.ExeSucc",
            period_start, period_end)

        if ho_att == 0:
            return None

        hosr = (ho_succ / ho_att) * 100.0
        self._store_kpi(f"{cell_id}:HOSR", period_end, hosr)
        return hosr

    def compute_dl_throughput(self, cell_id: str,
                              period_start: datetime,
                              period_end: datetime) \
            -> Optional[float]:
        """Average downlink cell throughput (Mbps).

        DL Throughput = (DRB.PdcpSduBitrateDl.Mean)
        Or derived from: DRB.PdcpSduVolumeDL / period_seconds
        """
        volume_bits = self._sum_counter(
            cell_id, "DRB.PdcpSduVolumeDL",
            period_start, period_end)

        period_s = (period_end - period_start).total_seconds()
        if period_s == 0:
            return None

        throughput_mbps = (volume_bits * 8) / (period_s * 1e6)
        self._store_kpi(f"{cell_id}:DL_THROUGHPUT",
                       period_end, throughput_mbps)
        return throughput_mbps

    def compute_rrc_setup_time(self, cell_id: str,
                                period_start: datetime,
                                period_end: datetime) \
            -> Optional[float]:
        """Average RRC Connection Setup Time (ms).

        Derived from:
        RRC.ConnEstabTimeMean or
        (RRC.ConnEstabTimeSum / RRC.ConnEstabSucc)
        """
        time_sum = self._sum_counter(
            cell_id, "RRC.ConnEstabTimeSum",
            period_start, period_end)
        successes = self._sum_counter(
            cell_id, "RRC.ConnEstabSucc",
            period_start, period_end)

        if successes == 0:
            return None

        avg_ms = time_sum / successes
        self._store_kpi(f"{cell_id}:RRC_SETUP_TIME",
                       period_end, avg_ms)
        return avg_ms

    def _sum_counter(self, cell_id, counter_name,
                     start, end) -> float:
        total = 0.0
        for key, counters in self.counter_store.items():
            if cell_id in key and counter_name in key:
                for c in counters:
                    if start <= c.collection_time <= end:
                        total += c.counter_value
        return total

    def _avg_counter(self, cell_id, counter_name,
                     start, end) -> float:
        values = []
        for key, counters in self.counter_store.items():
            if cell_id in key and counter_name in key:
                for c in counters:
                    if start <= c.collection_time <= end:
                        values.append(c.counter_value)
        return sum(values) / len(values) if values else 0.0

    def _store_kpi(self, kpi_key, timestamp, value):
        if kpi_key not in self.kpi_history:
            self.kpi_history[kpi_key] = []
        self.kpi_history[kpi_key].append((timestamp, value))


class AlarmCorrelator:
    """Correlates alarms across network layers.

    Identifies root causes by analyzing temporal and topological
    relationships between alarms from different network elements.
    """

    def __init__(self):
        self.active_alarms: Dict[str, X733Alarm] = {}
        self.correlation_rules: List[dict] = []
        self._init_rules()

    def _init_rules(self):
        """Define alarm correlation rules.

        Rules encode domain knowledge about cause-effect
        relationships in telecom networks.
        """
        self.correlation_rules = [
            {
                "name": "fiber_cut_radio_impact",
                "root_cause": {
                    "managed_object_class": "TransportLink",
                    "probable_cause": "loss-of-signal",
                },
                "symptoms": [
                    {"managed_object_class": "NRCellDU",
                     "probable_cause": "communication-failure"},
                    {"managed_object_class": "GnbDuFunction",
                     "probable_cause": "back-haul-failure"},
                ],
                "time_window_s": 60,
                "description": "Transport link failure causes "
                              "cell outages on connected gNBs",
            },
            {
                "name": "power_failure_cascade",
                "root_cause": {
                    "managed_object_class": "PowerSupply",
                    "event_type": AlarmEventType.ENVIRONMENT,
                },
                "symptoms": [
                    {"managed_object_class": "GnbDuFunction",
                     "probable_cause": "equipment-malfunction"},
                    {"managed_object_class": "NRCellDU",
                     "probable_cause": "communication-failure"},
                ],
                "time_window_s": 30,
                "description": "Power supply failure causes "
                              "full site outage",
            },
            {
                "name": "core_nf_overload",
                "root_cause": {
                    "managed_object_class": "AMFFunction",
                    "probable_cause": "threshold-crossed",
                    "specific_problem_pattern": ".*cpu.*|.*memory.*",
                },
                "symptoms": [
                    {"managed_object_class": "NRCellDU",
                     "probable_cause": "threshold-crossed",
                     "specific_problem_pattern": ".*CSSR.*"},
                ],
                "time_window_s": 120,
                "description": "Core NF overload causes cell-level "
                              "KPI degradation across many cells",
            },
        ]

    def process_alarm(self, alarm: X733Alarm) \
            -> Optional[str]:
        """Process incoming alarm and attempt correlation.

        Returns root_cause_alarm_id if correlated, None otherwise.
        """
        if alarm.perceived_severity == AlarmSeverity.CLEARED:
            self._clear_alarm(alarm)
            return None

        self.active_alarms[alarm.alarm_id] = alarm

        # Try to correlate with existing alarms
        for rule in self.correlation_rules:
            root_id = self._try_correlate(alarm, rule)
            if root_id:
                alarm.root_cause_alarm_id = root_id
                return root_id

        return None

    def _try_correlate(self, new_alarm: X733Alarm,
                       rule: dict) -> Optional[str]:
        """Try to correlate new alarm using a rule."""
        root_spec = rule["root_cause"]
        symptom_specs = rule["symptoms"]
        window = timedelta(seconds=rule["time_window_s"])

        # Check if new alarm matches a symptom
        is_symptom = False
        for spec in symptom_specs:
            if self._alarm_matches_spec(new_alarm, spec):
                is_symptom = True
                break

        if not is_symptom:
            # Check if new alarm IS the root cause
            if self._alarm_matches_spec(new_alarm, root_spec):
                # Look for existing symptom alarms to correlate
                for aid, existing in self.active_alarms.items():
                    if aid == new_alarm.alarm_id:
                        continue
                    time_diff = abs(
                        (existing.event_time -
                         new_alarm.event_time).total_seconds())
                    if time_diff <= rule["time_window_s"]:
                        for spec in symptom_specs:
                            if self._alarm_matches_spec(
                                    existing, spec):
                                existing.root_cause_alarm_id = \
                                    new_alarm.alarm_id
                                existing.correlated_notifications\
                                    .append(
                                        new_alarm.notification_id)
            return None

        # New alarm is a symptom - find matching root cause
        for aid, existing in self.active_alarms.items():
            if aid == new_alarm.alarm_id:
                continue

            time_diff = abs(
                (existing.event_time -
                 new_alarm.event_time).total_seconds())
            if time_diff > rule["time_window_s"]:
                continue

            if self._alarm_matches_spec(existing, root_spec):
                new_alarm.correlated_notifications.append(
                    existing.notification_id)
                return existing.alarm_id

        return None

    def _alarm_matches_spec(self, alarm: X733Alarm,
                            spec: dict) -> bool:
        """Check if alarm matches correlation rule spec."""
        if "managed_object_class" in spec:
            if alarm.managed_object_class != \
                    spec["managed_object_class"]:
                return False

        if "probable_cause" in spec:
            if alarm.probable_cause != spec["probable_cause"]:
                return False

        if "event_type" in spec:
            if alarm.event_type != spec["event_type"]:
                return False

        if "specific_problem_pattern" in spec:
            pattern = spec["specific_problem_pattern"]
            if not re.search(pattern,
                            alarm.specific_problem,
                            re.IGNORECASE):
                return False

        return True

    def _clear_alarm(self, alarm: X733Alarm):
        """Process alarm clear."""
        # Find and remove the active alarm
        # Match by managed object instance + probable cause
        to_remove = []
        for aid, active in self.active_alarms.items():
            if (active.managed_object_instance ==
                    alarm.managed_object_instance and
                active.probable_cause ==
                    alarm.probable_cause):
                to_remove.append(aid)

        for aid in to_remove:
            del self.active_alarms[aid]

    def get_root_cause_summary(self) \
            -> List[Dict[str, any]]:
        """Get summary of active root cause alarms
        and their correlated symptoms."""
        roots = {}
        for aid, alarm in self.active_alarms.items():
            if alarm.root_cause_alarm_id is None:
                # This might be a root cause
                roots[aid] = {
                    "alarm": alarm,
                    "symptoms": [],
                }

        for aid, alarm in self.active_alarms.items():
            if alarm.root_cause_alarm_id and \
                    alarm.root_cause_alarm_id in roots:
                roots[alarm.root_cause_alarm_id]["symptoms"]\
                    .append(alarm)

        return [
            {"root_cause": info["alarm"],
             "symptom_count": len(info["symptoms"]),
             "symptoms": info["symptoms"]}
            for info in roots.values()
            if len(info["symptoms"]) > 0
        ]

The key domain insight that AI tools miss: telecom KPIs are computed from specific 3GPP-defined counter names (TS 28.552), not from arbitrary application metrics. CSSR uses “RRC.ConnEstabAtt” and “RRC.ConnEstabSucc” — not custom counters you invent. Alarm correlation must be topology-aware: a fiber cut at one transport link can cause hundreds of cell-level alarms, and the operator needs to see one root cause, not 200 individual tickets. Claude Code excels at understanding the KPI formulas and their counter dependencies because it can reference the 3GPP performance measurement specifications. Cursor helps when YANG models, counter collection code, and KPI computation are in separate files. Copilot generates reasonable data aggregation code but uses generic metric names rather than the 3GPP-standardized counter nomenclature.

What AI Coding Tools Get Wrong in Telecom

After testing all major tools against telecom-specific tasks, these are the most common and most dangerous errors:

  1. Ignoring ASN.1 encoding constraints. AI tools generate JSON or protobuf serialization for RRC messages that must be ASN.1 PER-encoded. PER (Packed Encoding Rules) is a bit-level encoding where field positions are determined by the ASN.1 schema, not by field names — and a single bit offset error makes the entire message undecodable. If your spec says ASN.1, the code must use ASN.1.
  2. Missing timer management in protocol state machines. Every telecom protocol procedure is guarded by timers, and timer expiry is not an error — it is a defined state transition that must be handled explicitly. AI tools generate state machines that handle messages but ignore timers entirely, producing implementations that hang indefinitely on message loss.
  3. Generating stateless SIP handling. SIP is inherently stateful: dialogs have state (early, confirmed, terminated), transactions have state (calling, proceeding, completed), and subscriptions have state. AI tools trained primarily on HTTP code generate stateless request handlers that cannot maintain call state across re-INVITEs or handle forked responses.
  4. Not understanding 3GPP IE optionality. Information Elements in 3GPP messages have specific optionality rules: mandatory, conditional (present only when a condition is true), and optional. AI tools often include all IEs unconditionally or omit conditional IEs whose presence is required by the current protocol state, producing messages that fail conformance testing.
  5. Using HTTP/REST for signaling that requires SCTP. NG-AP (between gNB and AMF) runs over SCTP, not TCP or HTTP. SCTP provides message-oriented delivery with multi-homing and multi-streaming that are critical for signaling transport. AI tools default to HTTP/REST or TCP sockets, missing the SCTP-specific features (stream-based ordering, path failover) that the protocol depends on.
  6. Ignoring HARQ process management in MAC layer code. The MAC layer manages up to 16 HARQ processes in NR, each independently tracking transport block transmissions and retransmissions. AI tools generate MAC code that handles one transmission at a time, missing the concurrent HARQ process management that is fundamental to NR throughput.
  7. Missing GTP-U sequence number handling. GTP-U G-PDU messages carry optional sequence numbers used for reordering at handover boundaries. AI tools either ignore sequence numbers entirely or implement them incorrectly, causing packet reordering during handover that breaks TCP throughput.
  8. Generating blocking I/O in real-time signaling paths. Signaling processing has sub-millisecond latency requirements. AI tools generate synchronous database queries, file I/O, or DNS lookups in the signaling hot path, introducing latency spikes that cause timer expiries and call failures under load.
  9. Not handling protocol version negotiation. Telecom protocols evolve across 3GPP releases (Release 15, 16, 17, 18), and a UE must negotiate capabilities with the network. AI tools generate code that assumes a single protocol version, producing implementations that fail when connected to networks running different releases.
Cost Model: What Telecom Engineers Actually Need
  • $0/mo — Solo telecom researcher / hobbyist: GitHub Copilot Free (2,000 completions/month covers protocol experimentation) + Gemini CLI Free (unlimited during preview for reasoning through spec sections). Enough for learning 3GPP specs and building protocol prototypes.
  • $10/mo — Protocol developer: Copilot Pro for faster inline completions during repetitive protocol message structure coding. Good for grinding through hundreds of IE definitions and message encoding functions.
  • $20/mo — Senior telecom engineer (pick one): Claude Code ($20/mo) for spec interpretation, protocol state machine reasoning, timer interaction analysis, and conformance test design. OR Cursor Pro ($20/mo) for multi-file protocol stack development where RRC/PDCP/RLC/MAC layers reference each other. Choose Claude if your work is spec-heavy (interpreting 3GPP procedures, verifying state machine correctness). Choose Cursor if your work is implementation-heavy (writing and refactoring stack code).
  • $30/mo — Lead telecom architect: Claude Code ($20/mo) + Copilot Pro ($10/mo). Claude for spec reasoning, architecture decisions, protocol correctness verification. Copilot for fast inline completions during routine coding. Best combination for someone who both designs and implements protocol stacks.
  • $40-99/mo — Telecom vendor team seat: Cursor Business ($40/seat) for teams with shared protocol stack codebases — the multi-file indexing across shared projects is critical. Or enterprise tiers with on-premise deployment for vendors handling classified telecom infrastructure. Add Claude Code team tier for protocol reasoning alongside IDE completions.
Verification Checklist for AI-Generated Telecom Code

Before committing any AI-generated code to a telecom protocol implementation, verify:

  1. Every protocol state has explicit timer management. Every state that waits for a message must have a guard timer, and every timer expiry must trigger a defined state transition — not an exception, not a TODO, but a transition to a specific state with specific actions.
  2. ASN.1 encoding matches the spec. If the 3GPP spec defines a message using ASN.1, verify that the encoding uses the correct PER variant (aligned or unaligned), that optional IEs use the correct presence bitmap encoding, and that ENUMERATED and CHOICE types use the correct index encoding.
  3. SIP dialog state is maintained correctly. Verify that To-tags are captured from responses, that Record-Route headers are reversed for route sets, that CSeq numbers increment per-dialog, and that re-INVITE is handled as a mid-dialog operation, not a new dialog.
  4. GTP-U TEID 0 is never used for user data. TEID 0 is reserved for GTP-U path management (echo request/response). Verify that tunnel allocation starts at TEID 1.
  5. SCTP multi-homing is configured for signaling. If the code handles NG-AP, S1-AP, or Diameter signaling, verify it uses SCTP with at least two association paths and handles SCTP COMM_LOST/RESTART notifications.
  6. No blocking I/O in the signaling path. Every operation in the signaling hot path must be non-blocking. Database lookups, DNS resolution, logging to disk, and external API calls must be asynchronous or offloaded to worker threads.
  7. KPI formulas use 3GPP counter names. CSSR uses RRC.ConnEstabAtt and RRC.ConnEstabSucc, not custom metrics. CDR uses RRC.ConnRel.Abnormal, not application error rates. Verify counter names match TS 28.552.
  8. Handover handling preserves data plane continuity. During intra-NR handover, PDCP PDUs must be forwarded from source to target gNB, and the UE must re-establish the RLC layer on the target cell without losing in-transit user data.
  9. Error Indication is sent for unknown TEIDs. When a GTP-U G-PDU arrives with a TEID that does not match any active tunnel, the correct response is an Error Indication message — not silent discard.
  10. Protocol version compatibility is handled. Verify that the code includes capability negotiation or version checking for peers running different 3GPP releases, and that mandatory IEs for the negotiated version are always present.

Related Guides

Explore More Guides

Compare all the tools and pricing on our main comparison table, check the cheapest tools guide for budget options, or see the enterprise guide for organizational procurement and data governance considerations.

Related Posts