Digital Radio
Image by Goodtiming8871 CC-BY-SA 4.0 |
Most people learn how AM and FM radio works at some point or another. These are methods for transmitting and receiving an analogue signal, but nowadays most radio signals are digital. How is it possible to send bytes from one place to another using analogue electromagnetic radiation? I thought I'd write a post to try to demystify it.
Essentially digital radio consists of a bunch of stages in a pipeline. These stages can be grouped into a transmit pipeline followed by a receive pipeline. The output of each stage is the input of the next. The transmit pipeline output is the electromagnetic radiation, which should also be the most obfuscated output in the whole pipeline. If everything goes well the receive pipeline output will be an array of bytes that match the transmit pipeline input.
I'm going to discuss Quadrature Amplitude Modulation, or QAM. The key idea is that we have two analogue controls we can twiddle when transmitting a signal: the first is amplitude, like in AM; the second is phase, which is a bit like FM since in order to increase the phase you need to momentarily increase the frequency of the signal and, likewise, in order to decrease the phase you need to momentarily lower it. We should be able to use both of these when sending data. However, it is difficult to see how to split the bits we want to send between the phase and the amplitude as these seem like two very different controls. Luckily there's a different, but equivalent, pair of controls which are more symmetric in nature.
If you have a cosine and a sine generator then you can produce weighted sums of these signals $I\,cos(\omega t) + Q\,sin(\omega t)$ and this gives you two controls, namely the coefficients $I$ and $Q$. Let's write $I+iQ=Ae^{i\theta}$, then:
$$
\begin{align}
I\,cos(\omega t) + Q\,sin(\omega t) &= I\,\text{Re}\left( e^{-i\omega t} \right) + Q\,\text{Re}\left(i e^{-i\omega t} \right) \\
&=\text{Re}\left((I + iQ)e^{-i\omega t} \right)\\
&=\text{Re} \left( Ae^{i\theta}e^{-i\omega t}\right)\\
&=\text{Re} \left( Ae^{-i(\omega t - \theta)}\right)\\
&=A\,cos(\omega t - \theta)\\
\end{align}
$$
which shows that the $I$ and $Q$ controls are equivalent to the $A$ and $\theta$ controls. This simplifies the problem somewhat. Now, instead of having two very different looking controls $A$ and $\theta$ we have two very similar looking controls, namely the coefficient $I$ of the cosine generator and the coefficient $Q$ of the sine generator. This will make it a lot easier to split the bits we want to send.
We're now in a position to describe each stage in the pipeline, in order. Note that in a real implementation there will be many more stages than I am going to describe here as I have decided to keep it as simple as possible and only focus on the stages which are strictly related to radio. (For example, before the byte stream is converted into a stream of symbols it may be segmented into packets and expanded with error detecting or correcting bytes; after packets are converted into symbols they may have a correlation sequence prepended so that an unsynchronized receiver can find the start of the packet and correct its clock; and before (I,Q) values are compared with constallation points there will probably be an Automatic Gain Control stage.)
Byte to Symbol Stage
Since we have two controls $(I,Q)$ which we can vary with time we need to start off by splitting the input byte stream into two streams. This is done by treating the data as a bit stream rather than a stream of bytes and segmenting this input data into equal sized symbols where, critically, the number of bits in each symbol is even. For example, Suppose we want to send the text 'hello' using 6 bit symbols (i.e. QAM 64) then we'd gettext | 'hello!' |
---|---|
ASCII bytes (base16) | 68 65 6c 6c 6f 21 |
bits | 0110 1000 0110 0101 0110 1100 0110 1100 0110 1111 0010 0001 |
QAM 64 symbols (base2) | 011010 000110 010101 101100 011011 000110 111100 100001 |
QAM 64 symbols (base10) | 26 6 21 44 27 6 60 33 |
Now since each symbol is an even number of bits we can divide it in half and use each half to drive one of the controls
Symbol to $(I,Q)$ Sample Stage
Suppose we're using QAM16 (4 bit symbols) and we want to transmit the symbol $b_3b_2b_1b_0$ then we'll use $b_3b_2$ to drive $I$ and $b_1b_0$ to drive $Q$. This means we need to divide the range $[-I_{max},I_{max}]$ into 3 equal parts to give us 4 values, and similarly for $[-Q_{max},Q_{max}]$. Putting this together gives us the following constellation points, each of which is the representation of a different QAM16 symbol.$(I,Q)$ Sample to Baseband Frequency Stage
Suppose we have a symbol rate of 1 Msym/s. We can think of these constellation points as flashing in a random order one million times per second. But real signals like voltages are continuous and so a better mental model is a dot (which happens to be sampled one million times a second) moving smoothly around the constellation diagram.What frequencies exist in this baseband signal? Well clearly the band must include 0 Hz since it is possible to just send the same symbol repeatedly. The highest frequency corresponds to alternately sending the bottom left and the top right symbols, and travelling anti-clockwise around the constellation diagram. This would correspond to half a million rotations per second. However, you could also alternate between these symbols by travelling clockwise around the diagram and this would correspond to minus half a million rotations per second. So the baseband frequency band must include the range [-symbol_rate/2, +symbol_rate/2]. But if we're clever about how we travel from one symbol to the next we can to ensure that there are no frequencies outside that range.
The function
$$
\begin{align}
f(t) &= sinc(\omega_0 t) \\
&= \frac{sin(\omega_0 t)}{\omega_0 t}
\end{align}
$$
can be shown to have the following fourier transform
$$
\begin{align}
\tilde{f}(\omega) &= \int_{-\infty}^{\infty}e^{-i\omega t}f(t) dt \\
&= \begin{cases}
\frac{\pi}{\omega_0} &\text{for}\; \;\lvert \omega \rvert < \omega_0 \\
0 &\text{for}\;\; \lvert \omega \rvert > \omega_0 \\
\end{cases}
\end{align}
$$
So if we set $\omega_0 = \text{symbol_rate}\times\pi$ we get a function $f$ with no angular frequencies outside $[-\omega_0,\omega_0]$. Additionally this function has a zero at every multiple of the symbol period other than the one at $t = 0$:
If we shift $sinc(\omega_0 t)$ by a multiple of the symbol period $T$ we get a function $sinc(\omega_0 (t - nT))$ whose fourier transform is still zero outside $[-\omega_0,\omega_0]$. This means that we can scale and superpose multiple such functions without breaking the frequency band mask. But the really nice thing about this is that in the sum
$$
\sum_{n = -\infty}^{\infty} A_n\,sinc(\omega_0 (t - nT))
$$
only the $n$th summand contributes anything to the sample point $nT$. So this leads to a plan: To convert discrete per-symbol $(I,Q)$ samples into continuous $(I,Q)$ signals perform the following sums
$$
\begin{align}
I(t) &= \sum_{n = -\infty}^{\infty} I_n\,sinc(\omega_0 (t - nT))\\
Q(t) &= \sum_{n = -\infty}^{\infty} Q_n\,sinc(\omega_0 (t - nT))
\end{align}
$$
Typical baseband I and Q signals @ 1Msym/s vs time |
However, note that to avoid introducing an infinite delay in this stage of the transmit pipeline you have to replace $sinc$ with some approximation that is zero everywhere beyond a certain distance from the origin.
Baseband Frequency to Intermediate Frequency Stage
Unfortunately you can't just broadcast baseband frequency. For a start it would be illegal, but it wouldn't work even if it were legal since everyone doing this would trash eachothers' signals, and because the lowest and highest frequencies in the signal would be attenutated by very different amounts.In practice you will be given permission to transmit in a given frequency band, which will either be licensed (meaning you have bought an exclusive right in a geographical area) or unlicensed (meaning anyone can transmit, but you may be required to adhere to a specific protocol to handle contention). This means that we need to translate the signal from the $[-\omega_0, \omega_0]$ range to the right. How do we do this?
The simple answer is to multiply by cosine and sine generators to produce a new signal
$$
IF\_signal = I(t)\,cos(\omega_{IF}t) + Q(t)\,sin(\omega_{IF}t)
$$
It is easy to show that $IF\_signal$ has a fourier transform that is zero outside of $\pm[\omega_{IF}-\omega_0,\omega_{IF}+\omega_0]$. This is because according to the convolution theorem, the Fourier transform of the product of two functions is the convolution of the Fourier transforms of the two functions and we know a) that $I$ and $Q$ have transforms that are zero for $\omega > \lvert \omega_0 \rvert$ and b) that $cos(\omega_{IF}t)$ and $sin(\omega_{IF}t)$ have transforms which are sums of delta functions centred at $\pm\omega_{IF}$.
Starting off with a symbol rate of 1Msym/s and using an intermediate frequency of 10MHz gives a signal with the following frequency response.
I/F signal transform vs Hz, where I/F is 10MHz and symbol rate is 1MHz |
Now, you could in principle transmit this signal, but in practice it is difficult to implement firmware and hardware that step all the way up from baseband frequency to radio frequency in one go. This is why the output from this stage in the pipeline is called intermediate frequency. Thus there is usually one more major step before the signal is transmitted:
Intermediate Frequency to Radio Frequency Stage
In this stage the I/F signal is multiplied by a cosine generator whose frequency is the difference between that of the incoming I/F signal and the desired R/F signal.If you apply the convolution theorem again you find that this results in a signal with frequencies in two bands (I'm ignoring the negative frequency aliases from here on):
$$
\begin{align}
&[\omega_{RF}-\omega_0, \omega_{RF}+\omega_0]\\
&[\omega_{RF}-\omega_0, \omega_{RF}+\omega_0] -2\omega_{IF}
\end{align}
$$
The reason for this is that, unlike when we stepped up from baseband to I/F, when we step up from I/F to R/F the incoming signal is not a single band centered around zero, but instead consists of two bands centered around plus $\omega_{IF}$ and $-\omega_{IF}$.
What this means in practice is that you need to pass the signal through a high pass filter before you can transmit it onto the air.
Receive Pipeline
For each stage in the transmit pipeline there is an equivalent stage in the receive pipeline. We don't need to go into quite so much detail here since doing so would repeat a lot of what was said above. However, the highlights are- To get the R/F signal down to I/F you need to multiply the signal by a cosine generator and send through a low pass filter
- To convert the I/F signal to the $I(t),Q(t)$ baseband signals, multiply by $cos(\omega_0 t)$ (for $I$) and $sin(\omega_0 t)$ (for $Q$), and then integrate over a whole number of Intermediate Frequency periods.
Finally
- To convert the continuous $I(t),Q(t)$ baseband signals to per-symbol $I$, and $Q$ samples you need to sample each at the symbol centre. The subtlety I've omitted is: how does the receiver synchronize to the symbol_rate and how does it locate centre of the symbols? The answer to both is usually that the transmitter prepends packets with a predefined correlation sequence.
- To convert the $(I,Q)$ samples back to symbols just look up the nearest constellation point. I have omitted a detail here: the previous stages will have caused a somewhat unpredictable level of attenuation in the signal and so an AGC (Automatic Gain Control) will be necessary, possibly based on the correlation sequence.
- Finally the symbols can be converted to bits and the bitstream divided into bytes
I've haven't worked directly in MAC/modem design, so everything I've understood about it I've picked up by osmosis from colleagues that did. So, to test my understanding I quickly knocked up the following simulation code, which produces all of the diagrams used in this post
#!/usr/bin/env python import numpy as np class PipelineUnreadable(Exception): pass class Pipeline: ''' Base class for a pipeline where the writer writes list of objects of some kind and then closes, and the reader reads list of objects of possibly a different kind. An exception is raised by read() if the Pipeline is empty unless the Pipeline was previously closed in which case None is returned. ''' def __init__(self): ''' This should be overidden ''' self.output_pipe = [] self.closed = False def write(self, x): ''' This should be overidden ''' self.output_pipe.extend(x) def close(self): self.closed = True def read(self): if not self.output_pipe: if self.closed: return None raise PipelineUnreadable retval = self.output_pipe self.output_pipe = [] return retval class ByteToSymbolPipeline(Pipeline): ''' converts bytes to QAM symbols MSBit first. So p = BytesToQAMSymbolsPipeline(16) p.write(0x12) p.write(0x34) p.read() # returns 1 p.read() # returns 2 p.read() # returns 3 p.read() # returns 4 ''' def __init__(self, qam): '''qam should be an even power of 2 over 1, i.e. 2**2, 2**4, 2**6, ...''' self.log2_qam = qam.bit_length() - 1 assert self.log2_qam & 1 == 0 assert (2**self.log2_qam) == qam self.qam = qam self.closed = False self.output_pipe = [] self.input_pipe = [] def write(self, _input): '_input can be a bytearray object or a list' for byte in _input: for bitpos in xrange(7,-1,-1): self.input_pipe.append(1 if (byte & (1<<bitpos)) else 0) # move one symbol at a time from input_pipe to output_pipe while True: if self.log2_qam > len(self.input_pipe): break symbol = 0 bits_remaining = self.log2_qam while bits_remaining: symbol <<= 1 symbol |= self.input_pipe.pop(0) bits_remaining -= 1 self.output_pipe.append(symbol) class SymbolToBytePipeline(Pipeline): ''' converts QAM symbols to Bytes MSBit first. So p = BytesToQAMSymbolsPipeline(16) p.write(1) p.write(2) p.write(3) p.write(4) p.read() # returns 0x12 p.read() # returns 0x34 ''' def __init__(self, qam): '''qam should be an even power of 2 over 1, i.e. 2**2, 2**4, 2**6, ...''' self.log2_qam = qam.bit_length() - 1 assert self.log2_qam & 1 == 0 assert (2**self.log2_qam) == qam self.qam = qam self.closed = False self.output_pipe = bytearray() self.input_pipe = [] def write(self, _input): '_input must be a list of ints' for symbol in _input: for bitpos in xrange(self.log2_qam - 1,-1,-1): self.input_pipe.append(1 if (symbol & (1<<bitpos)) else 0) # move one symbol at a time from input_pipe to output_pipe while True: if 8 > len(self.input_pipe): break byte = 0 bits_remaining = 8 while bits_remaining: byte <<= 1 byte |= self.input_pipe.pop(0) bits_remaining -= 1 self.output_pipe.append(byte) class SymbolToIQPipeline(Pipeline): ''' Convert symbols into (I,Q) (a.k.a. "baseband") samples ''' def __init__(self, qam): ''' qam should be an even power of 2 over 1, i.e. 2**2, 2**4, 2**6, ... ''' self.log2_qam = qam.bit_length() - 1 assert self.log2_qam & 1 == 0 assert (2**self.log2_qam) == qam self.qam = qam sqrt_qam = 1<<(self.log2_qam/2) # self.I is a lookup from the 1st half of the symbol bits and likewise self.Q for the 2nd half self.I = self.Q = np.linspace(-1, 1, sqrt_qam) self.closed = False self.output_pipe = [] def write(self, _input): '_input must be a list of ints' # convert one symbol at a time for symbol in _input: # Split symbol into upper bits and lower bits I_symbol = symbol >> self.log2_qam/2 Q_symbol = symbol & ((1<<self.log2_qam/2)-1) # Convert upper bits into I_sample and lower bits into Q_sample I_sample = self.I[I_symbol] Q_sample = self.Q[Q_symbol] # Add (I,Q) sample to output self.output_pipe.append((I_sample, Q_sample)) class IQToSymbolPipeline(Pipeline): ''' Convert (I,Q) values (sampled at centre of symbol) into symbols ''' def __init__(self, qam): ''' qam should be an even power of 2 over 1, i.e. 2**2, 2**4, 2**6, ... ''' self.log2_qam = qam.bit_length() - 1 assert self.log2_qam & 1 == 0 assert (2**self.log2_qam) == qam self.qam = qam sqrt_qam = 1<<(self.log2_qam/2) # self.I is a lookup from the 1st half of the symbol bits and likewise self.Q for the 2nd half self.I = self.Q = np.linspace(-1, 1, sqrt_qam) self.closed = False self.output_pipe = [] # bodgy gain control - TODO make this another pipeline self.rms_expected = np.sqrt(sum(self.I**2 + self.Q**2) / len(self.I)) def write(self, _input): '_input must be a list of 2-tuples' # bodgy gain control - TODO make this another pipeline rms = np.sqrt(sum(map(lambda (I,Q): (I**2+Q**2), _input)) / len(_input)) scale = self.rms_expected / rms # convert I,Q samples to symbol one at a time for (I_sample, Q_sample) in _input: # bodgy gain control - TODO make this another pipeline I_sample *= scale Q_sample *= scale # Find I_symbol and Q_symbol which are closest I_sample_dists = np.abs(self.I - I_sample) Q_sample_dists = np.abs(self.Q - Q_sample) I_symbol = I_sample_dists.argmin() Q_symbol = Q_sample_dists.argmin() # reconstruct symbol from two halves symbol = (I_symbol << self.log2_qam/2) | Q_symbol # Add symbol to output_pipe self.output_pipe.append(symbol) class IQToBBPipeline(Pipeline): ''' Convert per-symbol (I,Q) values to baseband samples. ''' def __init__(self, symbol_rate_Hz=1e6, sample_rate_Hz=10e6, buflen=16*10): ''' @symbol_rate_Hz: the frequency of symbols passed to write() @sample_rate_Hz: the frequency of (I,Q) samples generated by read() @buflen: the number of samples each input (I,Q) sample affects NB: sample_rate_Hz and symbol_rate_Hz are not used directly, only their ratio is used. ''' # values of x to pass into the sinc(x) function self.buflen = buflen self.bb_samples_per_symbol = n = int(sample_rate_Hz / symbol_rate_Hz) x = np.arange(-float(buflen/2), float(buflen-buflen/2)) x /= n self.sinc = np.sinc(x) self.closed = False self.output_pipe = [] self.phase_next = 0. self.I_samples = np.zeros(self.buflen) self.Q_samples = np.zeros(self.buflen) # Do not return the first (buflen - bb_samples_per_symbol)/2 samples self.ignore_remaining = (buflen - n)/2 # How many samples to add to output_pipe on close() self.samples_delayed = 0 def write(self, _input): '_input must be a list of 2-tuples with one per symbol' # Place an I*self.sinc into the I output stream and likewise for the Q output stream. # self.sinc is nice because it returns to zero every self.bb_samples_per_symbol samples # except for one point which corresponds to the centre of the symbol where it is one. # convert per-symbol (I,Q) to BB samples one at a time for (I, Q) in _input: I_samples = I*self.sinc Q_samples = Q*self.sinc self.I_samples += I_samples self.Q_samples += Q_samples # save the first symbol's worth of the samples for the output_pipe and shift n = self.bb_samples_per_symbol l = self.buflen I_part1, I_part2 = self.I_samples[:n].copy(), self.I_samples[n:].copy() Q_part1, Q_part2 = self.Q_samples[:n].copy(), self.Q_samples[n:].copy() self.I_samples[:l - n] = I_part2 self.Q_samples[:l - n] = Q_part2 self.I_samples[l - n:] = 0 self.Q_samples[l - n:] = 0 # ignore the first few samples because buflen is greater than bb_samples_per_symbol self.samples_delayed += n if self.ignore_remaining: if self.ignore_remaining >= n: self.ignore_remaining -= n continue # next (I,Q) else: skip = self.ignore_remaining I_part1 = I_part1[skip:] Q_part1 = Q_part1[skip:] self.ignore_remaining -= skip # move the first symbol's worth of samples (minus any ignored) to the output_pipe self.output_pipe.extend(zip(I_part1, Q_part1)) self.samples_delayed -= len(I_part1) def close(self): # however many were skipped at start we need to now append to the output_pipe if self.ignore_remaining: skip = self.ignore_remaining l = self.buflen I_samples = self.I_samples[skip:].copy() Q_samples = self.Q_samples[skip:].copy() self.I_samples[:l - skip] = I_samples self.Q_samples[:l - skip] = Q_samples self.I_samples[l - skip:] = 0 self.Q_samples[l - skip:] = 0 n = self.samples_delayed I_samples = self.I_samples[:n].copy() Q_samples = self.Q_samples[:n].copy() self.output_pipe.extend(zip(I_samples, Q_samples)) self.closed = True class BBToIQPipeline(Pipeline): ''' Sample the baseband (I,Q) samples at the centre point of each symbol ''' def __init__(self, symbol_rate_Hz=1e6, sample_rate_Hz=10e6): ''' @symbol_rate_Hz: the frequency of symbols passed to write() @sample_rate_Hz: the frequency of (I,Q) samples generated by read() NB: sample_rate_Hz and symbol_rate_Hz are not used directly, only their ratio is used. ''' bb_samples_per_symbol = int(sample_rate_Hz / symbol_rate_Hz) self.down_counter = bb_samples_per_symbol / 2 self.down_counter_reset = bb_samples_per_symbol self.closed = False self.output_pipe = [] def write(self, _input): '_input must be a list of 2-tuples with one (I,Q) pair per BB sample' # convert per-symbol (I,Q) to BB samples one at a time for (I, Q) in _input: if self.down_counter == 0: self.down_counter = self.down_counter_reset self.output_pipe.append((I,Q)) self.down_counter -= 1 class BBToIFPipeline(Pipeline): ''' Convert baseband (I,Q) samples to I/F (Intermediate Frequency) samples. I samples are used to scale the output of a cosine generator. Q samples are used to scale the output of a sine generator. ''' def __init__(self, bb_sample_rate_Hz=10e6, if_sample_rate_Hz=100e6, if_frequency_Hz=10e6): ''' @bb_sample_rate_Hz: the frequency of (I,Q) samples passed to write() @if_sample_rate_Hz: the frequency of I/F samples generated by read() @if_frequency_Hz: the I/F frequency NB: if_sample_rate_Hz should be at least double if_frequency_Hz ''' self.closed = False self.output_pipe = [] self.normalized_phase_next = 0. # normalized means divided by 2*pi self.normalized_phase_per_if_sample = if_frequency_Hz / if_sample_rate_Hz self.normalized_phase_per_bb_sample = if_frequency_Hz / bb_sample_rate_Hz self.if_samples_per_bb_sample = if_sample_rate_Hz/bb_sample_rate_Hz self.if_samples_delayed = 0. def write(self, _input): '_input must be a list of 2-tuples with one (I,Q) pair per BB sample' # convert I,Q samples to I/F samples one at a time for (I, Q) in _input: f = self.if_samples_per_bb_sample + self.if_samples_delayed n = int(f) self.if_samples_delayed = f - n normalized_phases = np.arange(n) * self.normalized_phase_per_if_sample + self.normalized_phase_next phases = 2 * np.pi * normalized_phases if_samples = I * np.cos(phases) + Q * np.sin(phases) self.normalized_phase_next += n * self.normalized_phase_per_if_sample self.normalized_phase_next -= int(self.normalized_phase_next) # Add I/F samples to output_pipe self.output_pipe.extend(if_samples.tolist()) class IFToBBPipeline(Pipeline): ''' Convert I/F (Intermediate Frequency) samples to baseband (I,Q) samples. ''' def __init__(self, bb_sample_rate_Hz=10e6, if_sample_rate_Hz=100e6, if_frequency_Hz=10e6): ''' @bb_sample_rate_Hz: the frequency of (I,Q) samples passed to write() @if_sample_rate_Hz: the frequency of I/F samples generated by read() @if_frequency_Hz: the I/F frequency NB: if_sample_rate_Hz should be at least double if_frequency_Hz ''' self.closed = False self.input_pipe = [] self.output_pipe = [] self.normalized_phase_next = 0. # normalized means divided by 2*pi self.normalized_phase_per_if_sample = if_frequency_Hz / if_sample_rate_Hz self.normalized_phase_per_bb_sample = if_frequency_Hz / bb_sample_rate_Hz self.if_samples_per_bb_sample = if_sample_rate_Hz/bb_sample_rate_Hz self.if_samples_delayed = 0. def write(self, _input): '_input must be a list of IF samples' self.input_pipe.extend(_input) # convert I/F samples to BB (I,Q) samples one at a time f = self.if_samples_per_bb_sample + self.if_samples_delayed n = int(f) self.if_samples_delayed = f - n while True: if n > len(self.input_pipe): break # grab one bb sample's worth of I/F samples if_samples = self.input_pipe[:n] self.input_pipe = self.input_pipe[n:] normalized_phases = np.arange(n) * self.normalized_phase_per_if_sample + self.normalized_phase_next phases = normalized_phases * 2 * np.pi self.normalized_phase_next += n * self.normalized_phase_per_if_sample self.normalized_phase_next -= int(self.normalized_phase_next) # Infer baseband (I, Q) sample by integrating I = (sum(np.cos(phases) * if_samples)) * (2./n) Q = (sum(np.sin(phases) * if_samples)) * (2./n) self.output_pipe.append((I,Q)) from scipy import signal def butter_filter(data, cutoff, fs, btype='high', order=5): nyq = 0.5 * fs normal_cutoff = cutoff / nyq b, a = signal.butter(order, normal_cutoff, btype=btype, analog=False) return signal.filtfilt(b, a, data) class IFToRFPipeline(Pipeline): ''' Convert I/F (Intermediate Frequency) samples to R/F (Radio Frequency) samples. This is done by first multplying the I/F signal by a cosine generator. This produces two superposed signals with the same phase and half the amplitude: the first has frequency rf_frequency_Hz and the second rf_frequency_Hz - 2*if_frequency_Hz. The second stage is to filter out the low frequency signal. ''' def __init__(self, if_sample_rate_Hz=100e6, rf_sample_rate_Hz=1e9, if_frequency_Hz=10e6, rf_frequency_Hz=100e6): ''' @if_sample_rate_Hz: the frequency of I/F samples passed to write() @rf_sample_rate_Hz: the frequency of R/F samples generated by read() @if_frequency_Hz: the I/F frequency @rf_frequency_Hz: the R/F frequency NB: rf_sample_rate_Hz should be at least double rf_frequency_Hz ''' self.closed = False self.output_pipe = [] self.cosine_freq = rf_frequency_Hz - if_frequency_Hz self.normalized_phase_next = 0. # normalized means divided by 2*pi self.normalized_phase_per_rf_sample = self.cosine_freq / rf_sample_rate_Hz self.normalized_phase_per_if_sample = self.cosine_freq / if_sample_rate_Hz self.rf_samples_per_if_sample = rf_sample_rate_Hz/if_sample_rate_Hz self.rf_samples_delayed = 0. self.rf_sample_rate_Hz = rf_sample_rate_Hz self.rf_frequency_Hz = rf_frequency_Hz self.if_frequency_Hz = if_frequency_Hz def write(self, _input): '_input must be a list of I/F samples' # convert I/F samples to R/F samples one at a time rf_samples = [] for if_sample in _input: # create rf_samples by multiplying if_sample by cosine generator f = self.rf_samples_per_if_sample + self.rf_samples_delayed n = int(f) self.rf_samples_delayed = f - n normalized_phases = np.arange(n) * self.normalized_phase_per_rf_sample + self.normalized_phase_next phases = 2 * np.pi * normalized_phases rf_samples_extra = if_sample * np.cos(phases) self.normalized_phase_next += n * self.normalized_phase_per_rf_sample self.normalized_phase_next -= int(self.normalized_phase_next) # high pass filter rf_samples rf_samples.extend(rf_samples_extra) rf_samples_filtered = butter_filter(rf_samples, self.cosine_freq, self.rf_sample_rate_Hz) self.output_pipe.extend(rf_samples_filtered) class RFToIFPipeline(Pipeline): ''' Convert R/F (Radio Frequency) samples to I/F (Intermediate Frequency) samples. This is done by first multplying the R/F signal by a cosine generator. This produces two superposed signals with the same phase and half the amplitude: the first has frequency if_frequency_Hz and the second rf_frequency_Hz - 2*if_frequency_Hz. The second stage is to filter out the high frequency signal. ''' def __init__(self, if_sample_rate_Hz=100e6, rf_sample_rate_Hz=1e9, if_frequency_Hz=10e6, rf_frequency_Hz=100e6): ''' @if_sample_rate_Hz: the frequency of I/F samples passed to write() @rf_sample_rate_Hz: the frequency of R/F samples generated by read() @if_frequency_Hz: the I/F frequency @rf_frequency_Hz: the R/F frequency NB: rf_sample_rate_Hz should be at least double rf_frequency_Hz ''' cosine_freq = rf_frequency_Hz - if_frequency_Hz self.closed = False self.output_pipe = [] self.normalized_phase_per_rf_sample = cosine_freq / rf_sample_rate_Hz self.normalized_phase_per_if_sample = cosine_freq / if_sample_rate_Hz self.rf_samples_per_if_sample = rf_sample_rate_Hz/if_sample_rate_Hz self.rf_samples_delayed = 0. self.rf_sample_rate_Hz = rf_sample_rate_Hz self.rf_frequency_Hz = rf_frequency_Hz self.if_frequency_Hz = if_frequency_Hz def write(self, _input): '_input must be a list of I/F samples' # convert R/F samples to I/F samples rf_samples = _input normalized_phases = np.arange(len(_input)) * self.normalized_phase_per_rf_sample phases = 2 * np.pi * normalized_phases if_samples_at_rf_sample_rate = butter_filter(rf_samples*np.cos(phases), (self.rf_frequency_Hz + self.if_frequency_Hz)/2, self.rf_sample_rate_Hz, 'low') n = int(self.rf_samples_per_if_sample) self.output_pipe.extend(if_samples_at_rf_sample_rate[n/2::n]) class ThermalNoisePipeline(Pipeline): ''' Add Gaussian noise ''' def __init__(self, SNR_dB=20.): ''' SNR_dB is the resulting SNR ratio (in dB) after the noise has been added ''' self.closed = False self.output_pipe = [] # how much noise to add relative to input signal self.noise_to_amplitude_rms_ratio = 10.**(-SNR_dB/20.) # this tracks the mean square of the amplitude of the input signal self.amplitude_ms_lpf = 0. # this tells us how much to weigh the current amplitude_ms_lpf (the new amplitude_ms gets weight one) self.amplitude_ms_lpf_orig_weight = 0 # amplitude_ms_lpf_orig_weight is incremented by one each sample - but set a maximum self.amplitude_ms_lpf_orig_weight_max = 1000 def write(self, _input): '_input must be a list of samples' for sample in _input: # update self.amplitude_rms_lpf amplitude_ms = sample * sample self.amplitude_ms_lpf = (self.amplitude_ms_lpf*self.amplitude_ms_lpf_orig_weight + amplitude_ms) / (self.amplitude_ms_lpf_orig_weight + 1) if self.amplitude_ms_lpf_orig_weight < self.amplitude_ms_lpf_orig_weight_max: self.amplitude_ms_lpf_orig_weight += 1 # guess root mean square and noise_amplitude amplitude_rms = np.sqrt(self.amplitude_ms_lpf) noise_amplitude = self.noise_to_amplitude_rms_ratio * amplitude_rms # add sample to output_pipe but with some noise self.output_pipe.append(sample + noise_amplitude * np.random.randn()) if __name__ == '__main__': import scipy.fftpack import matplotlib.pyplot as plt plt.ion() # Configure the stages qam = 16 SNR_dB=20. symbol_rate_Hz =1e6 if_frequency_Hz =10e6 rf_frequency_Hz =100e6 bb_sample_rate_Hz = symbol_rate_Hz * 10 if_sample_rate_Hz = if_frequency_Hz * 10 rf_sample_rate_Hz = rf_frequency_Hz * 10 stages = [ { 'pipeline': ByteToSymbolPipeline(qam), 'output_name': 'tx_symbols', 'plot': False}, { 'pipeline': SymbolToIQPipeline(qam), 'output_name': 'tx_iq_values', 'plot': 'scatter'}, { 'pipeline': IQToBBPipeline(symbol_rate_Hz, bb_sample_rate_Hz), 'output_name': 'tx_bb_samples', 'plot': 'time', 'sample_rate': bb_sample_rate_Hz}, { 'pipeline': BBToIFPipeline(bb_sample_rate_Hz, if_sample_rate_Hz, if_frequency_Hz), 'output_name': 'tx_if_samples', 'plot': 'fft', 'sample_rate': if_sample_rate_Hz }, { 'pipeline': IFToRFPipeline(if_sample_rate_Hz, rf_sample_rate_Hz, if_frequency_Hz, rf_frequency_Hz), 'output_name': 'tx_rf_samples', 'plot': 'fft', 'sample_rate': rf_sample_rate_Hz }, { 'pipeline': ThermalNoisePipeline(SNR_dB), 'output_name': 'rx_rf_samples', 'plot': 'fft', 'sample_rate': rf_sample_rate_Hz }, { 'pipeline': RFToIFPipeline(if_sample_rate_Hz, rf_sample_rate_Hz, if_frequency_Hz, rf_frequency_Hz), 'output_name': 'rx_if_samples', 'plot': 'fft', 'sample_rate': if_sample_rate_Hz }, { 'pipeline': IFToBBPipeline(bb_sample_rate_Hz, if_sample_rate_Hz, if_frequency_Hz), 'output_name': 'rx_bb_samples', 'plot': 'time', 'sample_rate': bb_sample_rate_Hz}, { 'pipeline': BBToIQPipeline(symbol_rate_Hz, bb_sample_rate_Hz), 'output_name': 'rx_iq_values', 'plot': 'scatter'}, { 'pipeline': IQToSymbolPipeline(qam), 'output_name': 'rx_symbols', 'plot': False}, { 'pipeline': SymbolToBytePipeline(qam), 'output_name': 'rx_bytes', 'plot': False}, ] # Pass the following data through the transmit pipelines data = tx_bytes = bytearray('abcdefghijklmnopABCDEFGHIJKLMNOP') print "tx_bytes:\t", data for stage in stages: p = stage['pipeline'] p.write(data) p.close() data = p.read() output_name = stage['output_name'] globals()[output_name] = data # TODO move this to <pipeline_object>.plot() function if stage['plot'] == 'time': plt.figure() plt.title(output_name) N = len(data) T = N/stage['sample_rate'] t = np.linspace(0.0, T, N, endpoint=False) plt.xticks([10**np.floor(np.log10(T))]) plt.gca().set_yticks([]) plt.gca().spines['left'].set_position('zero') plt.gca().spines['right'].set_color('none') plt.gca().spines['bottom'].set_position('zero') plt.gca().spines['top'].set_color('none') variables = zip(*data) if type(data[0]) == tuple else [data] for variable in variables: plt.plot(t, variable) elif stage['plot'] == 'scatter': plt.figure() plt.title(output_name) plt.gca().set_xticks([]) plt.gca().set_yticks([]) plt.gca().spines['left'].set_position('zero') plt.gca().spines['right'].set_color('none') plt.gca().spines['bottom'].set_position('zero') plt.gca().spines['top'].set_color('none') (x,y) = zip(*data) plt.scatter(x,y) elif stage['plot'] == 'fft': plt.figure() plt.title('FFT(%s)' % output_name) plt.gca().set_yticks([]) rate = stage['sample_rate'] N = len(data) x = np.linspace(0.0, N/rate, N, endpoint=False) yf = scipy.fftpack.fft(data) xf = np.linspace(0.0, rate/2, N/2) plt.plot(xf, 2.0/N * np.abs(yf[:N//2])) else: print "%s:\t" % output_name, data # Check we received the output data we were expecting assert rx_bytes == tx_bytes
Comments
Post a Comment