Digital Radio

Image by Goodtiming8871 CC-BY-SA 4.0

Most people learn how AM and FM radio works at some point or another.  These are methods for transmitting and receiving an analogue signal, but nowadays most radio signals are digital.  How is it possible to send bytes from one place to another using analogue electromagnetic radiation?  I thought I'd write a post to try to demystify it.

Essentially digital radio consists of a bunch of stages in a pipeline.  These stages can be grouped into a transmit pipeline followed by a receive pipeline.  The output of each stage is the input of the next.  The transmit pipeline output is the electromagnetic radiation, which should also be the most obfuscated output in the whole pipeline.  If everything goes well the receive pipeline output will be an array of bytes that match the transmit pipeline input.

I'm going to discuss Quadrature Amplitude Modulation, or QAM.  The key idea is that we have two analogue controls we can twiddle when transmitting a signal: the first is amplitude, like in AM; the second is phase, which is a bit like FM since in order to increase the phase you need to momentarily increase the frequency of the signal and, likewise, in order to decrease the phase you need to momentarily lower it.  We should be able to use both of these when sending data.   However, it is difficult to see how to split the bits we want to send between the phase and the amplitude as these seem like two very different controls.  Luckily there's a different, but equivalent, pair of controls which are more symmetric in nature.

If you have a cosine and a sine generator then you can produce weighted sums of these signals $I\,cos(\omega t) + Q\,sin(\omega t)$ and this gives you two controls, namely the coefficients $I$ and $Q$.  Let's write $I+iQ=Ae^{i\theta}$, then:
$$
\begin{align}
I\,cos(\omega t) + Q\,sin(\omega t) &= I\,\text{Re}\left( e^{-i\omega t} \right) + Q\,\text{Re}\left(i e^{-i\omega t} \right) \\
&=\text{Re}\left((I + iQ)e^{-i\omega t} \right)\\
&=\text{Re} \left( Ae^{i\theta}e^{-i\omega t}\right)\\
&=\text{Re} \left( Ae^{-i(\omega t - \theta)}\right)\\
&=A\,cos(\omega t - \theta)\\
\end{align}
$$

which shows that the $I$ and $Q$ controls are equivalent to the $A$ and $\theta$ controls.  This simplifies the problem somewhat.  Now, instead of having two very different looking controls $A$ and $\theta$ we have two very similar looking controls, namely the coefficient $I$ of the cosine generator and the coefficient $Q$ of the sine generator.  This will make it a lot easier to split the bits we want to send.

We're now in a position to describe each stage in the pipeline, in order.  Note that in a real implementation there will be many more stages than I am going to describe here as I have decided to keep it as simple as possible and only focus on the stages which are strictly related to radio.  (For example, before the byte stream is converted into a stream of symbols it may be segmented into packets and expanded with error detecting or correcting bytes; after packets are converted into symbols they may have a correlation sequence prepended so that an unsynchronized receiver can find the start of the packet and correct its clock; and before (I,Q) values are compared with constallation points there will probably be an Automatic Gain Control stage.)

Byte to Symbol Stage

Since we have two controls $(I,Q)$ which we can vary with time we need to start off by splitting the input byte stream into two streams.  This is done by treating the data as a bit stream rather than a stream of bytes and segmenting this input data into equal sized symbols where, critically, the number of bits in each symbol is even.  For example, Suppose we want to send the text 'hello' using 6 bit symbols (i.e. QAM 64) then we'd get

 text   'hello!'
 ASCII bytes (base16)   68 65 6c 6c 6f 21
 bits  0110 1000 0110 0101 0110 1100 0110 1100 0110 1111 0010 0001 
 QAM 64 symbols (base2)   011010 000110 010101 101100 011011 000110 111100 100001
 QAM 64 symbols (base10)   26 6 21 44 27 6 60 33

Now since each symbol is an even number of bits we can divide it in half and use each half to drive one of the controls

Symbol to $(I,Q)$ Sample Stage

Suppose we're using QAM16 (4 bit symbols) and we want to transmit the symbol $b_3b_2b_1b_0$ then we'll use $b_3b_2$ to drive $I$ and $b_1b_0$ to drive $Q$.  This means we need to divide the range $[-I_{max},I_{max}]$  into 3 equal parts to give us 4 values, and similarly for $[-Q_{max},Q_{max}]$.  Putting this together gives us the following constellation points, each of which is the representation of a different QAM16 symbol.


$(I,Q)$ Sample to Baseband Frequency Stage

Suppose we have a symbol rate of 1 Msym/s.  We can think of these constellation points as flashing in a random order one million times per second.  But real signals like voltages are continuous and so a better mental model is a dot (which happens to be sampled one million times a second) moving smoothly around the constellation diagram.

What frequencies exist in this baseband signal?  Well clearly the band must include 0 Hz since it is possible to just send the same symbol repeatedly.  The highest frequency corresponds to alternately sending the bottom left and the top right symbols, and travelling anti-clockwise around the constellation diagram. This would correspond to half a million rotations per second.  However, you could also alternate between these symbols by travelling clockwise around the diagram and this would correspond to minus half a million rotations per second.  So the baseband frequency band must include the range [-symbol_rate/2, +symbol_rate/2].  But if we're clever about how we travel from one symbol to the next we can to ensure that there are no frequencies outside that range.

The function
$$
\begin{align}
f(t) &= sinc(\omega_0 t) \\
&= \frac{sin(\omega_0 t)}{\omega_0 t}
\end{align}
$$
can be shown to have the following fourier transform
$$
\begin{align}
\tilde{f}(\omega) &= \int_{-\infty}^{\infty}e^{-i\omega t}f(t) dt \\
&=  \begin{cases}
\frac{\pi}{\omega_0} &\text{for}\; \;\lvert \omega \rvert < \omega_0 \\
0 &\text{for}\;\; \lvert \omega \rvert > \omega_0 \\
\end{cases}
\end{align}
$$
So if we set $\omega_0 = \text{symbol_rate}\times\pi$ we get a function $f$ with no angular frequencies outside $[-\omega_0,\omega_0]$.  Additionally this function has a zero at every multiple of the symbol period other than the one at $t = 0$:

If we shift $sinc(\omega_0 t)$ by a multiple of the symbol period $T$ we get a function $sinc(\omega_0 (t - nT))$ whose fourier transform is still zero outside $[-\omega_0,\omega_0]$.  This means that we can scale and superpose multiple such functions without breaking the frequency band mask.  But the really nice thing about this is that in the sum
$$
\sum_{n = -\infty}^{\infty} A_n\,sinc(\omega_0 (t - nT))
$$
only the $n$th summand contributes anything to the sample point $nT$.  So this leads to a plan: To convert discrete per-symbol $(I,Q)$ samples into continuous $(I,Q)$ signals perform the following sums
$$
\begin{align}
I(t) &= \sum_{n = -\infty}^{\infty} I_n\,sinc(\omega_0 (t - nT))\\
Q(t) &= \sum_{n = -\infty}^{\infty} Q_n\,sinc(\omega_0 (t - nT))
\end{align}
$$
Typical baseband I and Q signals @ 1Msym/s vs time

However, note that to avoid introducing an infinite delay in this stage of the transmit pipeline you have to replace $sinc$ with some approximation that is zero everywhere beyond a certain distance from the origin.

Baseband Frequency to Intermediate Frequency Stage

Unfortunately you can't just broadcast baseband frequency.  For a start it would be illegal, but it wouldn't work even if it were legal since everyone doing this would trash eachothers' signals, and because the lowest and highest frequencies in the signal would be attenutated by very different amounts.

In practice you will be given permission to transmit in a given frequency band, which will either be licensed (meaning you have bought an exclusive right in a geographical area) or unlicensed (meaning anyone can transmit, but you may be required to adhere to a specific protocol to handle contention).  This means that we need to translate the signal from the $[-\omega_0, \omega_0]$ range to the right.  How do we do this?

The simple answer is to multiply by cosine and sine generators to produce a new signal
$$
IF\_signal = I(t)\,cos(\omega_{IF}t) + Q(t)\,sin(\omega_{IF}t)
$$
It is easy to show that $IF\_signal$ has a fourier transform that is zero outside of $\pm[\omega_{IF}-\omega_0,\omega_{IF}+\omega_0]$.  This is because according to the convolution theorem, the Fourier transform of the product of two functions is the convolution of the Fourier transforms of the two functions and we know a) that $I$ and $Q$ have transforms that are zero for $\omega > \lvert \omega_0 \rvert$ and b) that $cos(\omega_{IF}t)$ and $sin(\omega_{IF}t)$ have transforms which are sums of delta functions centred at $\pm\omega_{IF}$.

Starting off with a symbol rate of 1Msym/s and using an intermediate frequency of 10MHz gives a signal with the following frequency response.

I/F signal transform vs Hz, where I/F is 10MHz and symbol rate is 1MHz


Now, you could in principle transmit this signal, but in practice it is difficult to implement firmware and hardware that step all the way up from baseband frequency to radio frequency in one go.  This is why the output from this stage in the pipeline is called intermediate frequency.  Thus there is usually one more major step before the signal is transmitted:

Intermediate Frequency to Radio Frequency Stage

In this stage the I/F signal is multiplied by a cosine generator whose frequency is the difference between that of the incoming I/F signal and the desired R/F signal.

If you apply the convolution theorem again you find that this results in a signal with frequencies in two bands (I'm ignoring the negative frequency aliases from here on):
$$
\begin{align}
&[\omega_{RF}-\omega_0, \omega_{RF}+\omega_0]\\
&[\omega_{RF}-\omega_0, \omega_{RF}+\omega_0] -2\omega_{IF}
\end{align}
$$

The reason for this is that, unlike when we stepped up from baseband to I/F, when we step up from I/F to R/F the incoming signal is not a single band centered around zero, but instead consists of two bands centered around plus $\omega_{IF}$ and $-\omega_{IF}$.

What this means in practice is that you need to pass the signal through a high pass filter before you can transmit it onto the air.

Receive Pipeline

For each stage in the transmit pipeline there is an equivalent stage in the receive pipeline.  We don't need to go into quite so much detail here since doing so would repeat a lot of what was said above.  However, the highlights are

  • To get the R/F signal down to I/F you need to multiply the signal by a cosine generator and send through a low pass filter
  • To convert the I/F signal to the $I(t),Q(t)$ baseband signals, multiply by $cos(\omega_0 t)$ (for $I$) and $sin(\omega_0 t)$ (for $Q$), and then integrate over a whole number of Intermediate Frequency periods.
Here are the results of my simulation.  Note that I deliberately added some gaussian noise between the transmit and receive pipelines to emulate all the various sources of noise in the system
 Finally
  • To convert the continuous $I(t),Q(t)$ baseband signals to per-symbol $I$, and $Q$ samples you need to sample each at the symbol centre.  The subtlety I've omitted is: how does the receiver synchronize to the symbol_rate and how does it locate centre of the symbols?  The answer to both is usually that the transmitter prepends packets with a predefined correlation sequence.
  • To convert the $(I,Q)$ samples back to symbols just look up the nearest constellation point.  I have omitted a detail here: the previous stages will have caused a somewhat unpredictable level of attenuation in the signal and so an AGC (Automatic Gain Control) will be necessary, possibly based on the correlation sequence.
  • Finally the symbols can be converted to bits and the bitstream divided into bytes

I've haven't worked directly in MAC/modem design, so everything I've understood about it I've picked up by osmosis from colleagues that did.  So, to test my understanding I quickly knocked up the following simulation code, which produces all of the diagrams used in this post



#!/usr/bin/env python
import numpy as np

class PipelineUnreadable(Exception):
    pass

class Pipeline:
    '''
    Base class for a pipeline where the writer
    writes list of objects of some kind and then closes,
    and the reader reads list of objects of possibly a different
    kind.  An exception is raised by read() if the Pipeline is empty
    unless the Pipeline was previously closed in which case
    None is returned.
    '''
    def __init__(self):
        ''' This should be overidden '''
        self.output_pipe = []
        self.closed = False

    def write(self, x):
        ''' This should be overidden '''
        self.output_pipe.extend(x)
        
    def close(self):
        self.closed = True
        
    def read(self):
        if not self.output_pipe:
            if self.closed: return None
            raise PipelineUnreadable
        retval = self.output_pipe
        self.output_pipe = []
        return retval
    

class ByteToSymbolPipeline(Pipeline):
    '''
    converts bytes to QAM symbols MSBit first.
    So
      p = BytesToQAMSymbolsPipeline(16)
      p.write(0x12)
      p.write(0x34)
      p.read() # returns 1
      p.read() # returns 2
      p.read() # returns 3
      p.read() # returns 4
    '''
    def __init__(self, qam):
        '''qam should be an even power of 2 over 1, i.e. 2**2, 2**4, 2**6, ...'''
        self.log2_qam = qam.bit_length() - 1
        assert self.log2_qam & 1 == 0
        assert (2**self.log2_qam) == qam
        self.qam = qam
        self.closed = False
        self.output_pipe = []
        self.input_pipe = []

    def write(self, _input):
        '_input can be a bytearray object or a list'
        for byte in _input:
            for bitpos in xrange(7,-1,-1):
                self.input_pipe.append(1 if (byte & (1<<bitpos)) else 0)

        # move one symbol at a time from input_pipe to output_pipe
        while True:
            if self.log2_qam > len(self.input_pipe):
                break
            symbol = 0
            bits_remaining = self.log2_qam
            while bits_remaining:
                symbol <<= 1
                symbol |= self.input_pipe.pop(0)
                bits_remaining -= 1
            self.output_pipe.append(symbol)
            
class SymbolToBytePipeline(Pipeline):
    '''
    converts QAM symbols to Bytes MSBit first.
    So
      p = BytesToQAMSymbolsPipeline(16)
      p.write(1)
      p.write(2)
      p.write(3)
      p.write(4)
      p.read() # returns 0x12
      p.read() # returns 0x34
    '''
    def __init__(self, qam):
        '''qam should be an even power of 2 over 1, i.e. 2**2, 2**4, 2**6, ...'''
        self.log2_qam = qam.bit_length() - 1
        assert self.log2_qam & 1 == 0
        assert (2**self.log2_qam) == qam
        self.qam = qam
        self.closed = False
        self.output_pipe = bytearray()
        self.input_pipe = []

    def write(self, _input):
        '_input must be a list of ints'
        for symbol in _input:
            for bitpos in xrange(self.log2_qam - 1,-1,-1):
                self.input_pipe.append(1 if (symbol & (1<<bitpos)) else 0)

        # move one symbol at a time from input_pipe to output_pipe
        while True:
            if 8 > len(self.input_pipe):
                break
            byte = 0
            bits_remaining = 8
            while bits_remaining:
                byte <<= 1
                byte |= self.input_pipe.pop(0)
                bits_remaining -= 1
            self.output_pipe.append(byte)

class SymbolToIQPipeline(Pipeline):
    '''
    Convert symbols into (I,Q) (a.k.a. "baseband") samples
    '''
    def __init__(self, qam):
        '''
        qam should be an even power of 2 over 1, i.e. 2**2, 2**4, 2**6, ...
        '''
        self.log2_qam = qam.bit_length() - 1
        assert self.log2_qam & 1 == 0
        assert (2**self.log2_qam) == qam
        self.qam = qam
        sqrt_qam = 1<<(self.log2_qam/2)

        # self.I is a lookup from the 1st half of the symbol bits and likewise self.Q for the 2nd half
        self.I = self.Q = np.linspace(-1, 1, sqrt_qam)

        self.closed = False
        self.output_pipe = []

    def write(self, _input):
        '_input must be a list of ints'

        # convert one symbol at a time
        for symbol in _input:
            
            # Split symbol into upper bits and lower bits
            I_symbol = symbol >> self.log2_qam/2
            Q_symbol = symbol & ((1<<self.log2_qam/2)-1)

            # Convert upper bits into I_sample and lower bits into Q_sample
            I_sample = self.I[I_symbol]
            Q_sample = self.Q[Q_symbol]

            # Add (I,Q) sample to output
            self.output_pipe.append((I_sample, Q_sample))

class IQToSymbolPipeline(Pipeline):
    '''
    Convert (I,Q) values (sampled at centre of symbol) into symbols
    '''
    def __init__(self, qam):
        '''
        qam should be an even power of 2 over 1, i.e. 2**2, 2**4, 2**6, ...
        '''
        self.log2_qam = qam.bit_length() - 1
        assert self.log2_qam & 1 == 0
        assert (2**self.log2_qam) == qam
        self.qam = qam
        sqrt_qam = 1<<(self.log2_qam/2)

        # self.I is a lookup from the 1st half of the symbol bits and likewise self.Q for the 2nd half
        self.I = self.Q = np.linspace(-1, 1, sqrt_qam)

        self.closed = False
        self.output_pipe = []

        # bodgy gain control - TODO make this another pipeline
        self.rms_expected = np.sqrt(sum(self.I**2 + self.Q**2) / len(self.I))


    def write(self, _input):
        '_input must be a list of 2-tuples'

        # bodgy gain control - TODO make this another pipeline
        rms = np.sqrt(sum(map(lambda (I,Q): (I**2+Q**2), _input)) / len(_input))
        scale = self.rms_expected / rms

        # convert I,Q samples to symbol one at a time
        for (I_sample, Q_sample) in _input:

            # bodgy gain control - TODO make this another pipeline
            I_sample *= scale
            Q_sample *= scale
            
            # Find I_symbol and Q_symbol which are closest
            I_sample_dists = np.abs(self.I - I_sample)
            Q_sample_dists = np.abs(self.Q - Q_sample)
            I_symbol = I_sample_dists.argmin()
            Q_symbol = Q_sample_dists.argmin()

            # reconstruct symbol from two halves
            symbol = (I_symbol << self.log2_qam/2) | Q_symbol

            # Add symbol to output_pipe
            self.output_pipe.append(symbol)

class IQToBBPipeline(Pipeline):
    '''
    Convert per-symbol (I,Q) values to baseband samples.
    '''
    def __init__(self, symbol_rate_Hz=1e6, sample_rate_Hz=10e6, buflen=16*10):
        '''
        @symbol_rate_Hz: the frequency of symbols passed to write()
        @sample_rate_Hz: the frequency of (I,Q) samples generated by read()
        @buflen:         the number of samples each input (I,Q) sample affects

        NB: sample_rate_Hz and symbol_rate_Hz are not used directly, only their ratio is used.
        '''
        # values of x to pass into the sinc(x) function
        self.buflen = buflen
        self.bb_samples_per_symbol = n = int(sample_rate_Hz / symbol_rate_Hz)
        x = np.arange(-float(buflen/2), float(buflen-buflen/2))
        x /= n
        self.sinc = np.sinc(x)

        self.closed = False
        self.output_pipe = []
        self.phase_next = 0.
        self.I_samples = np.zeros(self.buflen)
        self.Q_samples = np.zeros(self.buflen)

        # Do not return the first (buflen - bb_samples_per_symbol)/2 samples
        self.ignore_remaining = (buflen - n)/2

        # How many samples to add to output_pipe on close()
        self.samples_delayed = 0

    def write(self, _input):
        '_input must be a list of 2-tuples with one per symbol'

        # Place an I*self.sinc into the I output stream and likewise for the Q output stream.
        # self.sinc is nice because it returns to zero every self.bb_samples_per_symbol samples
        # except for one point which corresponds to the centre of the symbol where it is one.

        # convert per-symbol (I,Q) to BB samples one at a time
        for (I, Q) in _input:
            I_samples = I*self.sinc
            Q_samples = Q*self.sinc
            self.I_samples += I_samples
            self.Q_samples += Q_samples
             
            # save the first symbol's worth of the samples for the output_pipe and shift
            n = self.bb_samples_per_symbol
            l = self.buflen
            I_part1, I_part2 = self.I_samples[:n].copy(), self.I_samples[n:].copy()
            Q_part1, Q_part2 = self.Q_samples[:n].copy(), self.Q_samples[n:].copy()
            self.I_samples[:l - n] = I_part2
            self.Q_samples[:l - n] = Q_part2
            self.I_samples[l - n:] = 0
            self.Q_samples[l - n:] = 0

            # ignore the first few samples because buflen is greater than bb_samples_per_symbol
            self.samples_delayed += n
            if self.ignore_remaining:
                if self.ignore_remaining >= n:
                    self.ignore_remaining -= n
                    continue # next (I,Q)
                else:
                    skip = self.ignore_remaining
                    I_part1 = I_part1[skip:]
                    Q_part1 = Q_part1[skip:]
                    self.ignore_remaining -= skip

            # move the first symbol's worth of samples (minus any ignored) to the output_pipe
            self.output_pipe.extend(zip(I_part1, Q_part1))
            self.samples_delayed -= len(I_part1)


    def close(self):
        # however many were skipped at start we need to now append to the output_pipe
        if self.ignore_remaining:
            skip = self.ignore_remaining
            l = self.buflen
            I_samples = self.I_samples[skip:].copy()
            Q_samples = self.Q_samples[skip:].copy()
            self.I_samples[:l - skip] = I_samples
            self.Q_samples[:l - skip] = Q_samples
            self.I_samples[l - skip:] = 0
            self.Q_samples[l - skip:] = 0
            
        n = self.samples_delayed
        I_samples = self.I_samples[:n].copy()
        Q_samples = self.Q_samples[:n].copy()
        self.output_pipe.extend(zip(I_samples, Q_samples))
        self.closed = True


class BBToIQPipeline(Pipeline):
    '''
    Sample the baseband (I,Q) samples at the centre point of each symbol
    '''
    def __init__(self, symbol_rate_Hz=1e6, sample_rate_Hz=10e6):
        '''
        @symbol_rate_Hz: the frequency of symbols passed to write()
        @sample_rate_Hz: the frequency of (I,Q) samples generated by read()

        NB: sample_rate_Hz and symbol_rate_Hz are not used directly, only their ratio is used.
        '''
        bb_samples_per_symbol = int(sample_rate_Hz / symbol_rate_Hz)
        self.down_counter = bb_samples_per_symbol / 2
        self.down_counter_reset = bb_samples_per_symbol
        self.closed = False
        self.output_pipe = []

    def write(self, _input):
        '_input must be a list of 2-tuples with one (I,Q) pair per BB sample'

        # convert per-symbol (I,Q) to BB samples one at a time
        for (I, Q) in _input:
            if self.down_counter == 0:
                self.down_counter = self.down_counter_reset
                self.output_pipe.append((I,Q))
            self.down_counter -= 1
        
class BBToIFPipeline(Pipeline):
    '''
    Convert baseband (I,Q) samples to I/F (Intermediate Frequency) samples.
    I samples are used to scale the output of a cosine generator.
    Q samples are used to scale the output of a sine generator.
    '''
    def __init__(self, bb_sample_rate_Hz=10e6, if_sample_rate_Hz=100e6, if_frequency_Hz=10e6):
        '''
        @bb_sample_rate_Hz: the frequency of (I,Q) samples passed to write()
        @if_sample_rate_Hz: the frequency of I/F samples generated by read()
        @if_frequency_Hz:   the I/F frequency

        NB: if_sample_rate_Hz should be at least double if_frequency_Hz
        '''
        self.closed = False
        self.output_pipe = []
        self.normalized_phase_next = 0. # normalized means divided by 2*pi
        self.normalized_phase_per_if_sample = if_frequency_Hz / if_sample_rate_Hz
        self.normalized_phase_per_bb_sample = if_frequency_Hz / bb_sample_rate_Hz
        self.if_samples_per_bb_sample = if_sample_rate_Hz/bb_sample_rate_Hz
        self.if_samples_delayed = 0.

    def write(self, _input):
        '_input must be a list of 2-tuples with one (I,Q) pair per BB sample'
        # convert I,Q samples to I/F samples one at a time
        for (I, Q) in _input:
            f = self.if_samples_per_bb_sample + self.if_samples_delayed
            n = int(f)
            self.if_samples_delayed = f - n
            normalized_phases = np.arange(n) * self.normalized_phase_per_if_sample + self.normalized_phase_next
            phases = 2 * np.pi * normalized_phases
            if_samples = I * np.cos(phases) + Q * np.sin(phases)
            self.normalized_phase_next += n * self.normalized_phase_per_if_sample
            self.normalized_phase_next -= int(self.normalized_phase_next)
            
            # Add I/F samples to output_pipe
            self.output_pipe.extend(if_samples.tolist())

class IFToBBPipeline(Pipeline):
    '''
    Convert I/F (Intermediate Frequency) samples to baseband (I,Q) samples.
    '''
    def __init__(self, bb_sample_rate_Hz=10e6, if_sample_rate_Hz=100e6, if_frequency_Hz=10e6):
        '''
        @bb_sample_rate_Hz: the frequency of (I,Q) samples passed to write()
        @if_sample_rate_Hz: the frequency of I/F samples generated by read()
        @if_frequency_Hz:   the I/F frequency

        NB: if_sample_rate_Hz should be at least double if_frequency_Hz
        '''
        self.closed = False
        self.input_pipe = []
        self.output_pipe = []
        self.normalized_phase_next = 0. # normalized means divided by 2*pi
        self.normalized_phase_per_if_sample = if_frequency_Hz / if_sample_rate_Hz
        self.normalized_phase_per_bb_sample = if_frequency_Hz / bb_sample_rate_Hz
        self.if_samples_per_bb_sample = if_sample_rate_Hz/bb_sample_rate_Hz
        self.if_samples_delayed = 0.

    def write(self, _input):
        '_input must be a list of IF samples'
        self.input_pipe.extend(_input)

        # convert I/F samples to BB (I,Q) samples one at a time
        f = self.if_samples_per_bb_sample + self.if_samples_delayed
        n = int(f)
        self.if_samples_delayed = f - n
        while True:
            if n > len(self.input_pipe):
                break

            # grab one bb sample's worth of I/F samples
            if_samples = self.input_pipe[:n]
            self.input_pipe = self.input_pipe[n:]
 
            normalized_phases = np.arange(n) * self.normalized_phase_per_if_sample + self.normalized_phase_next
            phases = normalized_phases * 2 * np.pi
            self.normalized_phase_next += n * self.normalized_phase_per_if_sample
            self.normalized_phase_next -= int(self.normalized_phase_next)

            # Infer baseband (I, Q) sample by integrating
            I = (sum(np.cos(phases) * if_samples)) * (2./n)
            Q = (sum(np.sin(phases) * if_samples)) * (2./n)
            self.output_pipe.append((I,Q))

from scipy import signal
def butter_filter(data, cutoff, fs, btype='high', order=5):
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = signal.butter(order, normal_cutoff, btype=btype, analog=False)
    return signal.filtfilt(b, a, data)

class IFToRFPipeline(Pipeline):
    '''
    Convert I/F (Intermediate Frequency) samples to R/F (Radio Frequency) samples.

    This is done by first multplying the I/F signal by a cosine generator.
    This produces two superposed signals with the same phase and half the amplitude:
    the first has frequency rf_frequency_Hz and the second rf_frequency_Hz - 2*if_frequency_Hz.
    The second stage is to filter out the low frequency signal.
    '''
    def __init__(self, if_sample_rate_Hz=100e6, rf_sample_rate_Hz=1e9, if_frequency_Hz=10e6, rf_frequency_Hz=100e6):
        '''
        @if_sample_rate_Hz: the frequency of I/F samples passed to write()
        @rf_sample_rate_Hz: the frequency of R/F samples generated by read()
        @if_frequency_Hz:   the I/F frequency
        @rf_frequency_Hz:   the R/F frequency

        NB: rf_sample_rate_Hz should be at least double rf_frequency_Hz
        '''
        self.closed = False
        self.output_pipe = []
        self.cosine_freq = rf_frequency_Hz - if_frequency_Hz
        self.normalized_phase_next = 0. # normalized means divided by 2*pi
        self.normalized_phase_per_rf_sample = self.cosine_freq / rf_sample_rate_Hz
        self.normalized_phase_per_if_sample = self.cosine_freq / if_sample_rate_Hz
        self.rf_samples_per_if_sample = rf_sample_rate_Hz/if_sample_rate_Hz
        self.rf_samples_delayed = 0.
        self.rf_sample_rate_Hz = rf_sample_rate_Hz
        self.rf_frequency_Hz = rf_frequency_Hz
        self.if_frequency_Hz = if_frequency_Hz

    def write(self, _input):
        '_input must be a list of I/F samples'
        # convert I/F samples to R/F samples one at a time
        rf_samples = []
        for if_sample in _input:

            # create rf_samples by multiplying if_sample by cosine generator
            f = self.rf_samples_per_if_sample + self.rf_samples_delayed
            n = int(f)
            self.rf_samples_delayed = f - n
            normalized_phases = np.arange(n) * self.normalized_phase_per_rf_sample + self.normalized_phase_next
            phases = 2 * np.pi * normalized_phases
            rf_samples_extra = if_sample * np.cos(phases)
            self.normalized_phase_next += n * self.normalized_phase_per_rf_sample
            self.normalized_phase_next -= int(self.normalized_phase_next)

            # high pass filter rf_samples
            rf_samples.extend(rf_samples_extra)

        rf_samples_filtered = butter_filter(rf_samples, self.cosine_freq, self.rf_sample_rate_Hz)
        self.output_pipe.extend(rf_samples_filtered)

class RFToIFPipeline(Pipeline):
    '''
    Convert R/F (Radio Frequency) samples to I/F (Intermediate Frequency) samples.

    This is done by first multplying the R/F signal by a cosine generator.
    This produces two superposed signals with the same phase and half the amplitude:
    the first has frequency if_frequency_Hz and the second rf_frequency_Hz - 2*if_frequency_Hz.
    The second stage is to filter out the high frequency signal.
    '''
    def __init__(self, if_sample_rate_Hz=100e6, rf_sample_rate_Hz=1e9, if_frequency_Hz=10e6, rf_frequency_Hz=100e6):
        '''
        @if_sample_rate_Hz: the frequency of I/F samples passed to write()
        @rf_sample_rate_Hz: the frequency of R/F samples generated by read()
        @if_frequency_Hz:   the I/F frequency
        @rf_frequency_Hz:   the R/F frequency

        NB: rf_sample_rate_Hz should be at least double rf_frequency_Hz
        '''
        cosine_freq = rf_frequency_Hz - if_frequency_Hz
        self.closed = False
        self.output_pipe = []
        self.normalized_phase_per_rf_sample = cosine_freq / rf_sample_rate_Hz
        self.normalized_phase_per_if_sample = cosine_freq / if_sample_rate_Hz
        self.rf_samples_per_if_sample = rf_sample_rate_Hz/if_sample_rate_Hz
        self.rf_samples_delayed = 0.
        self.rf_sample_rate_Hz = rf_sample_rate_Hz
        self.rf_frequency_Hz = rf_frequency_Hz
        self.if_frequency_Hz = if_frequency_Hz

    def write(self, _input):
        '_input must be a list of I/F samples'
        # convert R/F samples to I/F samples
        rf_samples = _input
        normalized_phases = np.arange(len(_input)) * self.normalized_phase_per_rf_sample
        phases = 2 * np.pi * normalized_phases
        if_samples_at_rf_sample_rate = butter_filter(rf_samples*np.cos(phases), (self.rf_frequency_Hz + self.if_frequency_Hz)/2, self.rf_sample_rate_Hz, 'low')

        n = int(self.rf_samples_per_if_sample)
        self.output_pipe.extend(if_samples_at_rf_sample_rate[n/2::n])
                
class ThermalNoisePipeline(Pipeline):
    '''
    Add Gaussian noise
    '''
    def __init__(self, SNR_dB=20.):
        '''
        SNR_dB is the resulting SNR ratio (in dB) after the noise has been added
        '''
        self.closed = False
        self.output_pipe = []
        
        # how much noise to add relative to input signal
        self.noise_to_amplitude_rms_ratio = 10.**(-SNR_dB/20.)

        # this tracks the mean square of the amplitude of the input signal
        self.amplitude_ms_lpf = 0.

        # this tells us how much to weigh the current amplitude_ms_lpf (the new amplitude_ms gets weight one)
        self.amplitude_ms_lpf_orig_weight = 0

        # amplitude_ms_lpf_orig_weight is incremented by one each sample - but set a maximum
        self.amplitude_ms_lpf_orig_weight_max = 1000

    def write(self, _input):
        '_input must be a list of samples'
        for sample in _input:
            # update self.amplitude_rms_lpf
            amplitude_ms = sample * sample
            self.amplitude_ms_lpf = (self.amplitude_ms_lpf*self.amplitude_ms_lpf_orig_weight + amplitude_ms) / (self.amplitude_ms_lpf_orig_weight + 1)
            if self.amplitude_ms_lpf_orig_weight < self.amplitude_ms_lpf_orig_weight_max:
                self.amplitude_ms_lpf_orig_weight += 1

            # guess root mean square and noise_amplitude
            amplitude_rms = np.sqrt(self.amplitude_ms_lpf)
            noise_amplitude = self.noise_to_amplitude_rms_ratio * amplitude_rms

            # add sample to output_pipe but with some noise
            self.output_pipe.append(sample + noise_amplitude * np.random.randn())

if __name__ == '__main__':

    import scipy.fftpack
    import matplotlib.pyplot as plt
    plt.ion()

    # Configure the stages
    qam = 16
    SNR_dB=20.
    symbol_rate_Hz  =1e6
    if_frequency_Hz =10e6
    rf_frequency_Hz =100e6
    bb_sample_rate_Hz = symbol_rate_Hz * 10
    if_sample_rate_Hz = if_frequency_Hz * 10
    rf_sample_rate_Hz = rf_frequency_Hz * 10

    stages = [
        { 'pipeline': ByteToSymbolPipeline(qam),
          'output_name': 'tx_symbols',    'plot': False},
        { 'pipeline': SymbolToIQPipeline(qam),
          'output_name': 'tx_iq_values',  'plot': 'scatter'},
        { 'pipeline': IQToBBPipeline(symbol_rate_Hz, bb_sample_rate_Hz),
          'output_name': 'tx_bb_samples', 'plot': 'time', 'sample_rate': bb_sample_rate_Hz},
        { 'pipeline': BBToIFPipeline(bb_sample_rate_Hz, if_sample_rate_Hz, if_frequency_Hz),
          'output_name': 'tx_if_samples', 'plot': 'fft', 'sample_rate': if_sample_rate_Hz },
        { 'pipeline': IFToRFPipeline(if_sample_rate_Hz, rf_sample_rate_Hz, if_frequency_Hz, rf_frequency_Hz),
          'output_name': 'tx_rf_samples', 'plot': 'fft', 'sample_rate': rf_sample_rate_Hz },
        { 'pipeline': ThermalNoisePipeline(SNR_dB),
          'output_name': 'rx_rf_samples', 'plot': 'fft', 'sample_rate': rf_sample_rate_Hz },
        { 'pipeline': RFToIFPipeline(if_sample_rate_Hz, rf_sample_rate_Hz, if_frequency_Hz, rf_frequency_Hz),
          'output_name': 'rx_if_samples', 'plot': 'fft', 'sample_rate': if_sample_rate_Hz },
        { 'pipeline': IFToBBPipeline(bb_sample_rate_Hz, if_sample_rate_Hz, if_frequency_Hz),
          'output_name': 'rx_bb_samples', 'plot': 'time', 'sample_rate': bb_sample_rate_Hz},
        { 'pipeline': BBToIQPipeline(symbol_rate_Hz, bb_sample_rate_Hz),
          'output_name': 'rx_iq_values',  'plot': 'scatter'},
        { 'pipeline': IQToSymbolPipeline(qam),
          'output_name': 'rx_symbols',    'plot': False},
        { 'pipeline': SymbolToBytePipeline(qam),
          'output_name': 'rx_bytes',      'plot': False},
    ]

    # Pass the following data through the transmit pipelines
    data = tx_bytes = bytearray('abcdefghijklmnopABCDEFGHIJKLMNOP')
    print "tx_bytes:\t", data

    for stage in stages:
        p = stage['pipeline']
        p.write(data)
        p.close()
        data = p.read()
        output_name = stage['output_name']
        globals()[output_name] = data

        # TODO move this to <pipeline_object>.plot() function
        if stage['plot'] == 'time':
            plt.figure()
            plt.title(output_name)
            N = len(data)
            T = N/stage['sample_rate']
            t = np.linspace(0.0, T, N, endpoint=False)
            plt.xticks([10**np.floor(np.log10(T))])
            plt.gca().set_yticks([])
            plt.gca().spines['left'].set_position('zero')
            plt.gca().spines['right'].set_color('none')
            plt.gca().spines['bottom'].set_position('zero')
            plt.gca().spines['top'].set_color('none')
            variables = zip(*data) if type(data[0]) == tuple else [data]
            for variable in variables: plt.plot(t, variable)
        elif stage['plot'] == 'scatter':
            plt.figure()
            plt.title(output_name)
            plt.gca().set_xticks([])
            plt.gca().set_yticks([])
            plt.gca().spines['left'].set_position('zero')
            plt.gca().spines['right'].set_color('none')
            plt.gca().spines['bottom'].set_position('zero')
            plt.gca().spines['top'].set_color('none')
            (x,y) = zip(*data)
            plt.scatter(x,y)
        elif stage['plot'] == 'fft':
            plt.figure()
            plt.title('FFT(%s)' % output_name)
            plt.gca().set_yticks([])
            rate = stage['sample_rate']
            N = len(data)
            x = np.linspace(0.0, N/rate, N, endpoint=False)
            yf = scipy.fftpack.fft(data)
            xf = np.linspace(0.0, rate/2, N/2)
            plt.plot(xf, 2.0/N * np.abs(yf[:N//2]))
        else:
            print "%s:\t" % output_name, data

    # Check we received the output data we were expecting
    assert rx_bytes == tx_bytes

Comments

Popular posts from this blog

How To Make ASCII Diagrams Beautifuller

Why growth is falling in all developed countries (as a long term trend)

Three ways to look at the Bell/GHZ experiment