|
|
| THIS_FILE = Path(__file__).resolve() |
|
| PROJECT_ROOT = THIS_FILE.parent |
|
| page_title |
|
| layout |
|
| page_icon |
|
| ndlrb = st.sidebar.selectbox("Bandwidth (NDLRB)", [6, 15, 25, 50], index=0) |
|
| normal_cp = st.sidebar.checkbox("Normal CP", value=True) |
|
| num_subframes = st.sidebar.number_input("Broj subfrejmova", 4, 10, 4) |
|
| n_id_2 = st.sidebar.selectbox("Cell ID Group (N_ID_2)", [0, 1, 2], index=1) |
|
| pbch_on = st.sidebar.checkbox("Pošalji MIB (PBCH)", value=True) |
|
| mib_mode = "manual" |
|
| mib_man = st.sidebar.text_input("24 bita", "1010"*6) |
|
| channel_enabled = st.sidebar.checkbox("Channel Active (Uključi smetnje)", value=True, help="Ako je isključeno, signal ide direktno TX->RX (idealan prenos).") |
|
| snr_db = st.sidebar.slider("SNR (dB)", -25.0, 35.0, 20.0, 1.0, disabled=not channel_enabled) |
|
| cfo_hz = st.sidebar.number_input("CFO (Hz)", -5000.0, 5000.0, 0.0, step=100.0, disabled=not channel_enabled) |
|
| seed = st.sidebar.number_input("RNG Seed", 0, 9999, 42) |
|
| rx_enabled = st.sidebar.checkbox("Receiver Active", value=True) |
|
| rx_corr = st.sidebar.checkbox("CFO Korekcija", value=True, disabled=not rx_enabled) |
|
| rx_descramble = st.sidebar.checkbox("Descrambling", value=True, disabled=not rx_enabled, help="Uključi/isključi de-scrambling bitova.") |
|
| rx_norm_pss = st.sidebar.checkbox("Normalize before PSS", value=True, disabled=not rx_enabled, help="Normalizuj signal prije korelacije.") |
|
| run_btn = st.sidebar.button("POKRENI SIMULACIJU", type="primary") |
| | cfg |
|
Dict[str, Any] | res = run_simulation(cfg) |
|
| c = st.session_state["cfg"] |
|
Dict[str, Any] | rx_out = res.get('rx_result') |
|
| col1 |
|
| col2 |
|
| col3 |
|
| col4 |
|
tuple | pss_ok = (rx_out.n_id_2_hat == c.tx.n_id_2) |
|
| delta |
|
Dict[str, Any] | est_cfo = rx_out.cfo_hat if rx_out.cfo_hat else 0.0 |
|
float | target_cfo = c.ch.freq_offset_hz if c.ch.enabled else 0.0 |
|
bool | has_bit_errors = False |
|
| tx_b_chk = np.array(res['mib_tx']).flatten().astype(int) |
|
| rx_b_chk = np.array(rx_out.mib_bits).flatten().astype(int) |
|
str | crc_text = "PASS" if rx_out.crc_ok else "FAIL" |
|
str | crc_subtext = "OK" |
|
str | crc_color = "normal" |
|
| delta_color |
| | tabs |
|
str | snr_status = f"{c.ch.snr_db} dB" if c.ch.enabled else "**Bypass (No Noise)**" |
|
| view_mode = st.radio("Tip prikaza", ["Energy (Magnitude)", "Resource Map (Structure)"], horizontal=True) |
|
Dict[str, Any] | grid = res["grid"] |
|
Dict[str, Any] | n_sym = grid.shape[1] |
|
| default_sym = int(res.get('pss_index', 0)) |
|
| sym_sel = st.slider("Odaberi OFDM Simbol", 0, n_sym - 1, default_sym) |
|
| fig |
|
| ax |
|
| figsize |
|
| im = ax.imshow(20*np.log10(np.abs(grid)+1e-12), aspect="auto", origin="lower", cmap='viridis') |
|
| color |
|
| linestyle |
|
| linewidth |
|
| label |
|
| curr_ndlrb = cfg.tx.ndlrb |
|
| curr_nid2 = cfg.tx.n_id_2 |
|
| curr_nsf = cfg.tx.num_subframes |
|
| curr_cp = cfg.tx.normal_cp |
|
int | curr_n_sc = curr_ndlrb * 12 |
|
int | curr_n_sym = curr_nsf * 14 |
|
np.ndarray | rmap = generate_resource_map((curr_n_sc, curr_n_sym), curr_ndlrb, curr_nid2, curr_nsf, curr_cp) |
|
list | cmap_colors = ['white', '#d62728', '#ff7f0e', '#1f77b4'] |
|
| cmap = ListedColormap(cmap_colors) |
|
list | bounds = [-0.5, 0.5, 1.5, 2.5, 3.5] |
|
| norm = BoundaryNorm(bounds, cmap.N) |
|
| fig_map |
|
| ax_map |
|
| aspect |
|
| origin |
|
| interpolation |
| list | patches |
|
| handles |
|
| bbox_to_anchor |
|
| loc |
|
| borderaxespad |
|
| sym_sel_bin = st.slider("Simbol", 0, grid.shape[1]-1, 0, key="bin_slider") |
|
np.ndarray | bins = build_ifft_input_bins(grid[:, sym_sel_bin], res["Nfft"]) |
|
int | mag_db = 20 * np.log10(np.abs(bins) + 1e-12) |
|
np.ndarray | current_types = rmap[:, sym_sel_bin] |
|
Dict[str, Any] | N = res["Nfft"] |
|
| type_bins = np.full(N, -1, dtype=int) |
|
Dict[str, Any] | num_sc = grid.shape[0] |
|
Dict[str, Any] | half = num_sc // 2 |
|
Dict[str, Any] | dc = N // 2 |
|
| pos_freq = np.arange(dc + 1, dc + 1 + half) |
|
| pos_sub = np.arange(half, num_sc) |
|
| neg_freq = np.arange(dc - half, dc) |
|
| neg_sub = np.arange(0, half) |
|
Dict[str, Any] | freq_idx = np.arange(N) - dc |
| dict | styles |
|
tuple | mask = (type_bins == t_code) |
|
| markerline |
|
| stemlines |
|
| baseline |
|
| markerfmt |
|
| basefmt |
|
| linefmt |
|
| markersize |
|
| alpha = 0.05 |
|
| bottom |
|
| True |
|
Dict[str, Any] | pss_corr = res['pss_corr'] |
|
| fig_pss |
|
| ax_pss |
|
| tau_hat |
|
Dict[str, Any] | syms = res.get('rx_pbch_symbols') |
|
| p_avg = np.mean(np.abs(syms)**2) |
|
list | corrected_syms = [] |
|
float | phase_acc = 0.0 |
|
| ideals = np.array([1+1j, 1-1j, -1+1j, -1-1j]) / np.sqrt(2) |
|
list | error_vectors = [] |
|
| s_rot = s * np.exp(-1j * phase_acc) |
|
tuple | dec = (np.sign(s_rot.real) + 1j * np.sign(s_rot.imag))/np.sqrt(2) |
|
| err = np.angle(s_rot * np.conj(dec)) |
|
| mse = np.mean(error_vectors) |
|
int | evm_rms = np.sqrt(mse) * 100 |
|
| syms_plot = np.array(corrected_syms) |
|
| fig_const |
|
| ax_const |
|
| imag |
|
| s |
|
| marker |
|
| tx_b = np.array(res['mib_tx']).flatten().astype(int) |
|
| rx_b = np.array(rx_out.mib_bits).flatten().astype(int) |
|
bool | sync_failed = False |
|
| ber = np.mean(tx_b != rx_b) |
|
| num_errs = np.sum(tx_b != rx_b) |
|
| title |
|
Dict[str, Any] | tx_w = res["tx_waveform"] |
|
Dict[str, Any] | rx_w = res["rx_waveform"] |
|
| max_start = max(0, tx_w.size - 1) |
|
| start_idx = st.slider("Start sample", 0, max_start, 0, key="time_start") |
|
| win_len = st.slider("Window length", 200, min(50000, tx_w.size), 5000, step=100, key="time_win") |
|
| end_idx = min(tx_w.size, start_idx + win_len) |
|
Dict[str, Any] | seg_tx = tx_w[start_idx:end_idx] |
|
Dict[str, Any] | seg_rx = rx_w[start_idx:end_idx] |
|
| x = np.arange(start_idx, end_idx) |
|
| axs |
|
| sharex |
|
| data |
|
| file_name |
|
| mime |
|
| fs = float(res["fs"]) |
|
| seg_len = st.selectbox("FFT segment length", [1024, 2048, 4096, 8192, 16384], index=2, key="spec_len") |
|
| S_tx = np.fft.fftshift(np.fft.fft(seg_tx)) |
|
| S_rx = np.fft.fftshift(np.fft.fft(seg_rx)) |
|
| f = np.fft.fftshift(np.fft.fftfreq(seg_len, d=1.0 / fs)) |
|
int | mag_tx_db = 20 * np.log10(np.abs(S_tx) + 1e-12) |
|
int | mag_rx_db = 20 * np.log10(np.abs(S_rx) + 1e-12) |
|
| c1 |
|
| c2 |
|
| sw_snr_start = st.number_input("SNR Start (dB)", -20.0, 30.0, -10.0) |
|
| sw_snr_end = st.number_input("SNR End (dB)", -20.0, 30.0, 10.0) |
|
| sw_snr_step = st.number_input("SNR Step", 1.0, 10.0, 2.0) |
|
| sw_cfo_start = st.number_input("CFO Start (Hz)", -2000.0, 2000.0, 0.0) |
|
| sw_cfo_end = st.number_input("CFO End (Hz)", -2000.0, 2000.0, 1000.0) |
|
| sw_cfo_step = st.number_input("CFO Step", 100.0, 1000.0, 500.0) |
|
| snr_vals = np.arange(sw_snr_start, sw_snr_end + 0.1, sw_snr_step) |
|
| cfo_vals = np.arange(sw_cfo_start, sw_cfo_end + 0.1, sw_cfo_step) |
|
list | results_list = [] |
|
| progress_bar = st.progress(0) |
|
| total_iter = len(snr_vals) * len(cfo_vals) |
|
int | curr_iter = 0 |
|
| orig_cfg = st.session_state["cfg"] |
|
| temp_ch = ChannelConfig(True, float(f_off), float(s), 42, 0.0) |
|
| temp_cfg = RunConfig(orig_cfg.tx, temp_ch, orig_cfg.rx) |
|
Dict[str, Any] | r = run_simulation(temp_cfg) |
|
Dict[str, Any] | rx_r = r.get('rx_result') |
|
str | crc_status = "OK" if (rx_r and rx_r.crc_ok) else "FAIL" |
|
Dict[str, Any] | tx_b_sw = res['mib_tx'] |
|
Dict[str, Any] | rx_b_sw = rx_r.mib_bits |
|
float | pss_peak = 0.0 |
|
Dict[str, Any] | cfo_est_val = rx_r.cfo_hat if (rx_r and rx_r.cfo_hat is not None) else np.nan |
|
| df_res = pd.DataFrame(results_list) |
|
| pivot_ber = df_res.pivot(index="SNR (dB)", columns="CFO (Hz)", values="BER") |
|
| fig_h |
|
| ax_h |
|
| rotation |
|
| cbar = fig_h.colorbar(im, ax=ax_h) |
LTE TX- Channel-RX
===============================================================================
Ova skripta pokreće Streamlit aplikaciju koja simulira kompletan LTE komunikacijski lanac:
Transmitter (TX) -> Channel (Kanal) -> Receiver (RX).
Funkcionalnosti:
- Napredna vizualizacija: Waveform, Spectrum, Grid Inspector, OFDM Bins, Constellation.
- RX Analiza: Digitalni prikaz bitova sa detekcijom grešaka.
- Sweeps: Masovna simulacija (SNR/CFO) sa tabelom rezultata i Heatmap.
- Kontrole: Channel Bypass, RX Disable, Descrambling, Normalization.
Pokretanje:
$ streamlit run gui_tx_channel.py