© Karan Singh Garewal 2020
K. S. GarewalPractical Blockchains and Cryptocurrencieshttps://doi.org/10.1007/978-1-4842-5893-4_8

8. The Helium Blockchain

Karan Singh Garewal1 
(1)
Toronto, ON, Canada
 

In this chapter, we continue forward from the last chapter and begin the construction of the Helium blockchain. Firstly, we will create the interface for the cryptographic functions that are used by Helium, and next we will develop some of Helium’s blockchain functionality.

If you are primarily interested in the theory of blockchains and cryptocurrencies, please feel free to proceed to the next theory chapter.

Python Crypto Packages

Helium makes extensive use of cryptographic functions, especially SHA-256, RIPEMD-160, and digital signatures. This functionality is available in the Python pycryptodome package. So, let us install this package. Ensure that you are at the root of your virtual Helium environment and that the virtual environment is active. Do
$(virtual) pip install pycryptodome

We will also need some ancillary Python modules. Firstly, we will be using Python’s re module. re is included in the standard Python distribution, so there is no need to install it. It is Python’s regular expression parser. re examines strings for the presence or absence of specified sub-strings and can also perform text substitutions.

Next, we must install the Base58 module. Base58 converts a very large string into a restricted alphanumeric string. The Base58 alphanumeric character set is
123456789ABCDEFGHJKLMNPQRSTUVWXYzabcdefghijkmnopqrstuvwxyz

A Base58-encoded string only includes characters from the preceding set.

The Base58 character set differs from the ordinary alphanumeric character set, in that it excludes characters that can be easily confused. The excluded characters are I (capital I), l (lowercase L), O (capital o), and 0 (zero).1

We will also be using the pdb debugger. pdb enables us to set breakpoints in source code, step through our source code line by line, and examine stack frames and the values of variables.2 pdb is part of the Python standard distribution, so there is no need to install it through pip. We will always import the pdb module into our Helium source code files.

rcrypt Module Walkthrough

The following module file, rcrypt.py, is the Helium interface into the cryptographic package pycryptodome. Copy the following rcrypt code into a file named rcrypt.py and save this file in crypt sub-directory.3

We are now ready for a code walkthrough of the rcrypt interface module:
"""
The rcrypt module implements various cryptographic functions that are required by
the Helium cryptocurrency application
This module requires the pycryptodome package to be installed.
The base58 package encodes strings into base58 format.
This module uses Python's regular expression module re.
This module uses the secrets module in the Python standard library to generate
cryptographically secure hexadecimal encoded strings.
"""
# import the regular expressions module
import re
# imports from the cryptodome package
from Crypto.Hash import SHA3_256
from Crypto.PublicKey import ECC
from Crypto.PublicKey import DSA
from Crypto.Signature import DSS
from Crypto.Hash import SHA256
from Crypto.Hash import RIPEMD160
import base58
import secrets
# import Python's debugging and logging modules
import pdb
import logging
"""
   log debugging messages to the file debug.log
"""
logging.basicConfig(filename="debug.log",filemode="w", \
  format='%(asctime)s:%(levelname)s:%(message)s', level=logging.DEBUG)
def make_SHA256_hash(msg: 'string') -> 'string':
    """
    make_sha256_hash computes the SHA-256 message digest or cryptographic hash
    for a received string argument. The secure hash value that is generated is converted
    into a sequence of hexadecimal digits and then returned by the function.
    The hexadecimal format of the message digest is 64 bytes long.
    """
    #convert the received msg string to a sequence of ascii bytes
    message = bytes(msg, 'ascii')
    # compute the SHA-256 message digest of msg and convert to a hexadecimal format
    hash_object = SHA256.new()
    hash_object.update(message)
    return hash_object.hexdigest()
def validate_SHA256_hash(digest: "string") -> bool:
    """
    validate_SHA256_hash: tests whether a string has an encoding conforming to a
    SHA-256 message digest in hexadecimal string format (64 bytes).
    """
    # a hexadecimal SHA256 message digest must be 64 bytes long.
    if len(digest) != 64: return False
    # This regular expression tests that the received string contains only
    # hexadecimal characters
    if re.search('[^0-9a-fA-F]', digest) == None: return True
    return False
def make_RIPEMD160_hash(message: 'byte stream') -> 'string':
    """
    RIPEMD-160 is a cryptographic algorithm that emits a 20 byte message digest.
    This function computes the RIPEMD-160 message digest of a message and returns
    the hexadecimal string encoded representation of the message digest (40 bytes).
    """
    # convert message to an ascii byte stream
    bstr = bytes(message, 'ascii')
    # generate the RIPEMD hash of message
    h = RIPEMD160.new()
    h.update(bstr)
    # convert to a hexadecimal encoded string
    hash = h.hexdigest()
    return hash
def validate_RIPEMD160_hash(digest: 'string') -> 'bool':
    """
    tests that a received string has an encoding conforming to a RIPE160 hash in
    hexadecimal format
    """
    if len(digest) != 40: return False
    # This regular expression tests that received string only contains
    # hexadecimal characters
    if re.search('[^0-9a-fA-F]+', digest) == None: return True
    return False
def make_ecc_keys():
    """
    make a private-public key pair using the elliptic curve cryptographic
    functions in the pycryptodome package.
    returns a tuple with the private key and public key in PEM format
    """
    # generate an ecc object
    ecc_key = ECC.generate(curve='P-256')
    # get the public key object
    pk_object = ecc_key.public_key()
    # export the private-public key pair in PEM format
    p = (ecc_key.export_key(format='PEM'), pk_object.export_key(format='PEM'))
    return p
def sign_message(private_key: 'String', message: 'String') -> 'string':
    """
    digitally signs a message using a private key generated using the
    elliptic curve cryptography module of the pycryptodome package.
    Receives a private key in PEM format and the message that is to be
    digitally signed.
    returns a hex encoded signature string.
    """
    # import the PEM format private key
    priv_key = ECC.import_key(private_key)
    # convert the message to a byte stream and
    # compute the SHA-256 message digest of the message
    bstr = bytes(message, 'ascii')
    hash   = SHA256.new(bstr)
    # create a digital signature object from the private key
    signer = DSS.new(priv_key, 'fips-186-3')
    # sign the SHA-256 message digest.
    signature = signer.sign(hash)
    sig = signature.hex()
    return sig
def verify_signature(public_key: 'String', msg: 'String', signature: 'string') -> 'bool':
    """
    tests whether a message is digitally signed by a private key to which a
    public key is paired.
    Receives a ECC public key in PEM format, the message that is to to be verified
    and the digital signature of the message.
    Returns True or False
    """
    try:
         # convert the message to a byte stream and compute the SHA-256 hash
         msg = bytes(msg, 'ascii')
         msg_hash = SHA256.new(msg)
         # signature to bytes
         signature = bytes.fromhex(signature)
         # import the PEM formatted public key and create a signature verifier
         # object from the public key
         pub_key  = ECC.import_key(public_key)
         verifier = DSS.new(pub_key, 'fips-186-3')
         # Verify the authenticity of the signed message
         verifier.verify(msg_hash, signature)
         return True
    except Exception as err:
        logging.debug('verify_signature: exception: ' + str(err))
def make_address(prefix: 'string') -> 'string':
    """
    generates a Helium address from a ECC public key in PEM format.
    prefix is a single numeric character which describes the type of
    the address. This prefix must be '1'
    """
    key = ECC.generate(curve='P-256')
    __private_key = key.export_key(format='PEM')
    public_key = key.public_key().export_key(format='PEM')
    val = make_SHA256_hash(public_key)
    val = make_RIPEMD160_hash(val)
    tmp = prefix + val
    # make a checksum
    checksum = make_SHA256_hash(tmp)
    checksum = checksum[len(checksum) - 4:]
    # add the checksum to the tmp result
    address = tmp + checksum
    # encode addr as a base58 sequence of bytes
    address =  base58.b58encode(address.encode())
    # The decode function converts a byte sequence to a string
    address = address.decode("ascii")
    return address
def validate_address(address: 'string') -> bool:
    """
     validates a Helium address using the four character checksum appended
     to the address. Receives a base58 encoded address.
     """
    # encode the string address as a sequence of bytes
    addr = address.encode("ascii")
    # reverse the base58 encoding of the address
    addr = base58.b58decode(addr)
    # convert the address into a string
    addr = addr.decode("ascii")
    # length must be RIPEMD-160 hash length + length of checksum + 1
    if (len(addr) != 45): return False
    if (addr[0] != '1'):  return False
    # extract the checksum
    extracted_checksum = addr[len(addr) - 4:]
    # extract the checksum out of addr and compute the
    # SHA-256 hash of the remaining addr string
    tmp = addr[:len(addr)- 4]
    tmp = make_SHA256_hash(tmp)
    # get the computed checksum from tmp
    checksum = tmp[len(tmp) - 4:]
    if extracted_checksum  == checksum: return True
    return False
def make_uuid() -> 'string':
    """
    makes an universally unique 256 bit id encoded as a hexadecimal string that is
    used as a transaction identifier. Uses the Python standard library secrets module to
    generate a cryptographic strong random 32 byte string encoded as a hexadecimal
    string (64 bytes)
    """
    id = secrets.token_hex(32)
    return id

A Pytest Primer

Unit tests provide us with assurances of code correctness. Consequently, we should always strive to provide a high amount of coverage of our code with unit tests. Conceptually, a unit test tests a small discrete piece of code. This piece of code is typically a function, but it can also be something small like an expression or a collection of expressions. We will be using the Python Pytest module to perform unit testing on our modules.

If you are not acquainted with the Pytest unit test framework, you can read Appendix 3. Appendix 3 is a comprehensive tutorial on how to use Pytest.

All of the Pytest files for Helium are available in the last appendix of this book.

rcrypt Unit Tests

Install the Pytest module from the root of your Helium virtual environment:
$(virtual) pip install pytest
Create a file called test_rcrypt.py in the unit_tests directory of your virtual environment, and then copy the following source code into this file. Traverse to the unit_tests directory and run the tests with
$(virtual) pytest test_rcrypt.py -s
The -s qualifier is optional. It provides expanded output:
"""
pytest unit tests for the rcrypt module
"""
import pytest
import rcrypt
import pdb
@pytest.mark.parametrize("input_string, value", [
    ('hello world', True),
    ('the quick brown fox jumped over the sleeping dog', True),
    ('0', True),
    ('', True)
])
def test_make_SHA256_hash(input_string, value):
    """
    test that a SHA-256 message digest is created
    """
    ret = rcrypt.make_SHA256_hash(input_string)
    assert rcrypt.validate_SHA256_hash(ret) == value
@pytest.mark.parametrize("input_string, value", [
    ('a silent night and a pale paper moon over the saskatchewan river', 64),
    ('Farmer Brown and the sheep-dog', 64),
    ('', 64)
])
def test_SHA256_hash_length(input_string, value):
    """
    test that the length of created SHA3-256 hash in hexadecimal format is
    is 64 bytes
    """
    assert len(rcrypt.make_SHA256_hash('hello world')) == 64
@pytest.mark.parametrize("digest, value", [
    ("123", False),
    ("644bcc7e564373040999aac89e7622f3ca71fba1d972fd94a31c3bfbf24e3938", True),
    ("644bcc7e564373040999aac89e7622f3ca71fba1d972fd94a31c3bfbf24e39380", False),
    ("644bcc7e564373040999aac89e7622f3ca71fba1d972fd94a31cZbfbf24e3938", False),
    ('', False),
    ("644bcc7e564373040999aac89e7622f3caz1fba1d972fd94a31c3bfbf24e3938", False),
])
def test_validate_SHA256_hash(digest, value):
    """
    validate whether a valid SHA-256 message digest format is generated
    """
    assert rcrypt.validate_SHA256_hash(digest) == value
@pytest.mark.parametrize("string_input, value", [
    ("0", True),
    ('andromeda galaxy', True),
    ('Farmer Brown and the sheep-dog', True),
    ('0', True)
])
def test_make_RIPEMD160_hash(string_input, value):
    """
    validate that a valid RIPEMD-160 message digest format is generated
    """
    ret = rcrypt.make_RIPEMD160_hash(string_input)
    assert rcrypt.validate_RIPEMD160_hash(ret) == value
@pytest.mark.parametrize("hash, value", [
    ('123456789098765432169Q156c79FFa1200CCB1A', False),
    ("lkorkflfor4flgofmr", False),
    ('off0099ijf87', False),
    ('1234567890A87654321690156c79FFa1200CCB1A', True),
])
def test_validate_RIPEMD160_hash(hash, value):
    """
    validate whether a string is in RIPEMD-160 message digest format
    """
    assert rcrypt.validate_RIPEMD160_hash(hash) == value
def test_make_ecc_keys():
    """
    test ECC private-public key pair
    """
    ecc_keys = rcrypt.make_ecc_keys()
    assert ecc_keys[0].find("BEGIN PRIVATE KEY") >= 0
    assert ecc_keys[0].find("END PRIVATE KEY") >= 0
    assert ecc_keys[0].find("END PRIVATE KEY") > ecc_keys[0].find("BEGIN PRIVATE KEY")
    assert ecc_keys[1].find("BEGIN PUBLIC KEY") >= 0
    assert ecc_keys[1].find("END PUBLIC KEY") >= 0
    assert ecc_keys[1].find("END PUBLIC KEY") > ecc_keys[1].find("BEGIN PUBLIC KEY")
@pytest.mark.parametrize("message, value", [
    ('50bfee49c706d766411777aac1c9f35456c33ecea2afb4f3c8f1033b0298bdc9', True),
    ('6e6404d8693aea119a8ef7cf82c56fe96555d8df2c36c2b9e325411fbef62014', True),
    ('ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad', True),
    ('hello world', True) ,
])
def test_sign_message(message, value):
    """
    Digitally sign a message and then verify it
    """
    ecc_tuple = rcrypt.make_ecc_keys()
    priv_key  = ecc_tuple[0]
    pub_key   = ecc_tuple[1]
    sig = rcrypt.sign_message(priv_key, message)
    ret = rcrypt.verify_signature(pub_key, message, sig)
    assert ret == value
@pytest.mark.parametrize("prefix, value", [
    ("a", False),
    ("1", True),
    ("Q", False),
    ("qwerty", False),
    ("", False),
])
def test_make_address(prefix, value):
    """
    test the generation of Helium addresses from seeds
    """
    ret = rcrypt.make_address(prefix)
    assert rcrypt.validate_address(ret) == value
@pytest.mark.parametrize("length, ret", [
    (64, True),
    (32, False)
])
def test_make_uuid(length, ret) :
    """
    test id generation
    """
    id = rcrypt.make_uuid()
    assert (len(id) == length) == ret

You should see 33 unit tests passing.

The Python Logger

You will notice that we make extensive use of the Python logger to assist us in debugging the source code. The following example demonstrates its usage:4
import logging
logging.basicConfig(filename="debug.log", format='%(asctime)s:%(levelname)s:%(message)s',
    level=logging.DEBUG)
 ...
def adder(x,y):
    z = x + y
    logging.debug("This is a debug message")
    logging.info("Informational message, output = " + str(z))
    if z != x + y:
        logging.error("An error has happened!")
    return z
The parameters of the logging.basicConfig function direct that all logging output is to be written to the debug.log file and that all log messages with a severity level of logging.DEBUG or higher are to be logged. Log levels in order of increasing severity are
 DEBUG, INFO, WARNING, ERROR, CRITICAL

The second parameter of the logger is optional; it specifies the format of the logger’s output.

Once we have set the logger’s configuration, we can specify a log message for this logger as follows:
logger.SEVERITY_LEVEL("Some String %s: %s", message_1, message_2)

SEVERITY_LEVEL is a log level: DEBUG, INFO ... CRITICAL.

This will, for example, generate output such as this:
 2020-02-01 16:05:03,969:SEVERITY_LEVEL:validate_block: nonce is negative
By default, the logger appends messages to a log file. We can instruct the logger to overwrite an existing log file by specifying an optional filemode parameter:
logging.basicConfig(filename="debug.log", filemode='w', level=DEBUG)

Helium Block Structure

In this section, we are going to discuss the structure of a Helium block. From Chapter 6, you will recall that a blockchain is an ordered, immutable collection of blocks that have cryptographic validation. Each Helium block is a container for transactions.

A block is the following dictionary data structure (the angle brackets describe the type of the block attribute):
               {
                  "prevblockhash":   <string>
                  "version":         <string>
                  "timestamp":       <integer>
                  "difficulty_bits": <integer>
                  "nonce":           <integer>
                  "merkle_root":     <string>
                  "height":          <integer>
                  "tx":              <list>
              }

In Python, an integer is a signed integer. Python 3 eliminates the difference between integers and long integers. Integers can have any size and are only limited by memory. Python can perform arithmetic operations on integers of unlimited size.

prevblockhash is the SHA-256 message digest of the previous block header in string hexadecimal format. This value is used to test whether the previous block has been tampered with. The prevblockhash value of the genesis block is the empty string.

The subset of block attributes prevblockhash, version, timestamp, difficulty_bits, nonce, and merkle_root is called the block header. The block header does not include the list of transactions. Because the merkle root is included in the header, we can test the validity of the entire block, inclusive of the transactions, by computing the SHA-256 message digest of the header only.

version is the version number of Helium to which this block pertains. This attribute is set in the Helium configuration module and is “1”.

timestamp is the Unix timestamp (epoch time) of when the block was created. Epoch time is the number of seconds that have elapsed since midnight, January 1, 1970, Greenwich Mean Time or UTC. It marks the birth of Unix.

difficulty_bits is the DIFFICULTY_BITS as defined in the Helium configuration module. This is the value when the block was mined. Note that the DIFFICULTY_BITS parameter is periodically varied as the average time to mine a block is adjusted toward ten minutes.

nonce is a number used in the Helium mining algorithm.

merkle_root is a SHA-256 hexadecimal string. The merkle root lets us ensure that the transactions in the block have not been tampered with. It also lets us determine if a particular transaction is present in the block. We will discuss this value in a subsequent chapter.

height is the height of this block. Blocks in a blockchain are ordered in conformity with the distributed consensus algorithm and can be visualized as being stacked one on top of another. The first block, which is the genesis block, has height 0.

tx is a list of transactions. The number of transactions in the list is constrained by the maximum permissible size of a block. In the Helium configuration module, this value is set to 1 MB.

Now that we have defined a block, we can specify a Helium blockchain:
blockchain = []

The blockchain is a list and each list element is a block.5

Helium Blockchain Walkthrough

We are now ready to walk through our Helium blockchain code. Copy the following program code into a file named hblockchain.py and copy this file into the block directory. This is not the complete program code for the blockchain module since transactions and database operations have been abstracted. Since we have not developed the code for transaction and database modules as of yet, we mock operations pertaining to these modules with synthetic values.

The Helium blockchain code uses the json and pickle modules. Both of these modules are part of the Python standard library, and thus, there is no need to install them. JSON is a data interchange standard that can encode Python objects into strings and then decode these strings back into the corresponding objects. Pickle serializes objects by converting objects into a sequence of bytes and then writing this sequence to a file. This process is called serialization. Pickle can deserialize files to recover the original objects. If you have never used Pickle before, Appendix 5 provides a concise introduction to Pickle usage.

The add_block function adds a block to the Helium blockchain. This function receives a block. The function checks whether the block attributes have valid values. In particular, it verifies that the transactions included in the block are valid. In the event that the block is invalid, the function returns with a false value. If the block is valid, the function serializes the block to a raw byte file through the Pickle module. This block is then added to the blockchain list in memory.

The serialize_block function serializes a block to the data directory using Pickle. The file name is constructed as follows:
filename = "block_" + block_height + ".dat"

The read_block function receives an index into the blockchain and returns the block corresponding to the index. An exception will be thrown if the index is out of bounds.

The blockheader_hash function receives a block and returns the SHA-256 message digest of the block header as a hexadecimal string. Observe again that since the merkle root can be used to test whether the transactions in the block have been tampered with, we do not have to compute the SHA-256 cryptographic hash of the block over the entire block contents. It is sufficient to calculate this value over only the block header since it includes the merkle root.

The validate_block function tests whether a block has valid values. This function also validates the previous block by comparing SHA-256 message digests.

The final program code listing for hblockchain as well as the other Helium modules is available in Appendix 10 of this book:
"""
hbockchain.py: This module creates and maintains the Helium blockchain
This is a partial implementation of the module
"""
import rcrypt
import hconfig
import json
import pickle
import pdb
import logging
import os
"""
log debugging messages to the file debug.log
"""
logging.basicConfig(filename="debug.log",filemode="w",\
    format='%(asctime)s:%(levelname)s:%(message)s',level=logging.DEBUG)
"""
A block is a Python dictionary that has the following
structure. The type of an attribute is denoted in angle delimiters.
               {
                  "prevblockhash":   <string>
                  "version":         <string>
                  "timestamp":       <integer>
                  "difficulty_bits": <integer>
                  "nonce":           <integer>
                  "merkle_root":     <string>
                  "height":          <integer>
                  "tx":              <list>
              }
The blockchain is a list where each list element is a block
This is also referred to as the primary blockchain when used
by miners.
"""
blockchain   = []
def add_block(block: "dictionary") -> "bool":
    """
    add_block: adds a block to the blockchain. Receives a block.
    The block attributes are checked for validity and each transaction in the block is
    tested for validity. If there are no errors, the block is written to a file as a
    sequence of raw bytes. Then the block is added to the blockchain.
    returns True if the block is added to the blockchain and False otherwise
    """
    try:
        # validate the received block parameters
        if validate_block(block) == False:
            raise(ValueError("block validation error"))
        # serialize the block to a file
        if (serialize_block(block) == False):
                raise(ValueError("serialize block error"))
        # add the block to the blockchain in memory
        blockchain.append(block)
    except Exception as err:
        print(str(err))
        logging.debug('add_block: exception: ' + str(err))
        return False
    return True
def serialize_block(block: "dictionary") -> "bool":
    """
    serialize_block: serializes a block to a file using pickle.
    Returns True if the block is serialized and False otherwise.
    """
    index = len(blockchain)
    filename = "block_" + str(index) + ".dat"
    # create the block file and serialize the block
    try:
        f = open(filename, 'wb')
        pickle.dump(block, f)
    except Exception as error:
        logging.debug("Exception: %s: %s", "serialize_block", error)
        f.close()
        return False
    f.close()
    return True
def read_block(blockno: 'long') -> "dictionary or False":
    """
    read_block: receives an index into the Helium blockchain.
    Returns a block or False if the block does not exist.
    """
    try:
        block = blockchain[blockno]
        return block
    except Exception as error:
        logging.debug("Exception: %s: %s", "read_block", error)
        return False
    return block
def blockheader_hash(block: 'dictionary') -> "False or String":
    """
    blockheader_hash: computes and returns SHA-256 message digest of a block header
    as a hexadecimal string.
    Receives a block those blockheader hash is to be computed.
    Returns False if there is an error, otherwise returns a SHA-256 hexadecimal string.
    The block header consists of the following block fields:
    (1) version, (2)previous block hash, (3) merkle root
    (4) timestamp, (5) difficulty_bits, and (6) nonce.
    """
    try:
        hash = rcrypt.make_SHA256_hash(block['version'] + block['prevblockhash'] +
                                  block['merkle_root'] + str(block['timestamp']) +
                                  str(block['difficulty_bits']) + str(block['nonce']))
    except Exception as error:
        logging.debug("Exception:%s: %s", "blockheader_hash", error)
        return False
    return hash
def validate_block(block: "dictionary") -> "bool":
    """
    validate_block: receives a block and verifies that all its attributes have
    valid values.
    Returns True if the block is valid and False otherwise.
    """
    try:
        if type(block) != dict:
            raise(ValueError("block type error"))
        # validate scalar block attributes
        if type(block["version"]) != str:
            raise(ValueError("block version type error"))
        if block["version"] != hconfig.conf["VERSION_NO"]:
            raise(ValueError("block wrong version"))
        if type(block["timestamp"]) != int:
            raise(ValueError("block timestamp type error"))
        if block["timestamp"] < 0:
            raise(ValueError("block invalid timestamp"))
        if type(block["difficulty_bits"]) != int:
            raise(ValueError("block difficulty_bits type error"))
        if block["difficulty_bits"] <= 0:
            raise(ValueError("block difficulty_bits <= 0"))
        if type(block["nonce"]) != int:
            raise(ValueError("block nonce type error"))
        if block["nonce"] != hconfig.conf["NONCE"]:
            raise(ValueError("block nonce is invalid"))
        if type(block["height"]) != int:
            raise(ValueError("block height type error"))
        if block["height"] < 0:
            raise(ValueError("block height < 0"))
        if len(blockchain) == 0 and block["height"] != 0:
             raise(ValueError("genesis block invalid height"))
        if len(blockchain) > 0:
             if block["height"] != blockchain[-1]["height"] + 1:
                  raise(ValueError("block height is not in order"))
        # The length of the block must be less than the maximum block size that
        # specified in the config module.
        # json.dumps converts the block into a json format string.
        if len(json.dumps(block)) > hconfig.conf["MAX_BLOCK_SIZE"]:
            raise(ValueError("block length error"))
        # validate the merkle_root.
        if block["merkle_root"] != merkle_root(block["tx"], True):
            raise(ValueError("merkle roots do not match"))
        # validate the previous block by comparing message digests.
        # the genesis block does not have a predecessor block
        if block["height"] > 0:
            if block["prevblockhash"] != blockheader_hash(blockchain[block["height"]-1]):
                raise(ValueError("previous block header hash does not match"))
        else:
            if block["prevblockhash"] != "":
                raise(ValueError("genesis block has prevblockhash"))
        # genesis block does not have any input transactions
        if block["height"] == 0 and block["tx"][0]["vin"] !=[]:
            raise(ValueError("missing coinbase transaction"))
        # a block other than the genesis block must have at least
        # two transactions: the coinbase transaction and at least
        # one more transaction
        if block["height"] > 0 and len(block["tx"]) < 2:
            raise(ValueError("block only has one transaction"))
    except Exception as error:
        logging.error("exception: %s: %s", "validate_block",error)
        return False
    return True
def merkle_root(buffer: "List", start: "bool" = False) -> "bool or string":
    """
    merkle_tree: computes the merkle root for a list of transactions.
    Receives a list of transactions and a boolean flag to indicate whether
    the function has been called for the first time or whether it is a
    recursive call from within the function.
    Returns the root of the merkle tree or False if there is an error.
    """
    pass

Helium Blockchain Unit Tests

Copy the following code to a file called test_blockchain.py and save it in the unit_tests directory. You can now traverse to this directory and run all of the tests:
 $(virtual) pytest test_blockchain.py -v -s

You should see 23 unit tests passing.

Since our present implementation of the Helium blockchain is not complete, the following test suite is a restricted set. The complete set of unit tests on blockchain.py is available in Appendix 9 of this book:
"""
test blockchain functionality
transaction values are synthetic.
"""
import pytest
import hblockchain
import hconfig
import rcrypt
import time
import os
import pdb
import secrets
def teardown_module():
    """
    after all of the tests have been executed, remove any blocks that were created
    """
    os.system("rm *.dat")
    hblockchain.blockchain.clear()
###################################################
# Make A Synthetic Random Transaction For Testing
###################################################
def make_random_transaction(block_height):
    tx = {}
    tx["version"] =  "1"
    tx["transactionid"] = rcrypt.make_uuid()
    tx["locktime"] = secrets.randbelow(hconfig.conf["MAX_LOCKTIME"])
    # public-private key pair for previous transaction
    prev_keys = rcrypt.make_ecc_keys()
    # public-private key pair for this transaction
    keys = rcrypt.make_ecc_keys()
    # Build vin
    tx["vin"] = []
    if block_height > 0:
        ctr = secrets.randbelow(hconfig.conf["MAX_INPUTS"]) + 1
        ind = 0
        while ind < ctr:
            signed = rcrypt.sign_message(prev_keys[0], prev_keys[1])
            ScriptSig = []
            ScriptSig.append(signed[0])
            ScriptSig.append(prev_keys[1])
            tx["vin"].append({
                    "txid": rcrypt.make_uuid(),
                    "vout_index": ctr,
                    "ScriptSig": ScriptSig
                })
            ind += 1
    # Build Vout
    tx["vout"] = []
    ctr = secrets.randbelow(hconfig.conf["MAX_OUTPUTS"]) + 1
    ind = 0
    while ind < ctr:
        ScriptPubKey = []
        ScriptPubKey.append("DUP")
        ScriptPubKey.append("HASH-160")
        ScriptPubKey.append(keys[1])
        ScriptPubKey.append("EQ_VERIFY")
        ScriptPubKey.append("CHECK-SIG")
        tx["vout"] = {
                  "value": secrets.randbelow(10000000) + 1000000,   # helium cents
                  "ScriptPubKey": ScriptPubKey
                }
        ind += 1
    return tx
#############################################
# Build Three Synthetic Blocks For Testing
#############################################
block_0 = {
           "prevblockhash": "",
           "version": "1",
           "timestamp": 0,
           "difficulty_bits": 20,
           "nonce": 0,
           "merkle_root": rcrypt.make_SHA256_hash('msg0'),
           "height": 0,
           "tx": [make_random_transaction(0)]
}
block_1 = {
           "prevblockhash": hblockchain.blockheader_hash(block_0),
           "version": "1",
           "timestamp": 0,
           "difficulty_bits": 20,
           "nonce": 0,
           "merkle_root": rcrypt.make_SHA256_hash('msg1'),
           "height": 1,
}
block_1["tx"] = []
block_1["tx"].append(make_random_transaction(1))
block_1["tx"].append(make_random_transaction(1))
block_2 = {
           "prevblockhash": hblockchain.blockheader_hash(block_1),
           "version": "1",
           "timestamp": 0,
           "difficulty_bits": 20,
           "nonce": 0,
           "merkle_root": rcrypt.make_SHA256_hash('msg2'),
           "height": 2,
}
block_2["tx"] = []
block_2["tx"].append(make_random_transaction(2))
block_2["tx"].append(make_random_transaction(2))
def test_block_type(monkeypatch):
    """
    tests the type of a block
    """
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg0'))
    assert hblockchain.validate_block(block_0) == True
def test_add_good_block(monkeypatch):
    """
    test add a good block
    """
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg0'))
    assert hblockchain.add_block(block_0) == True
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg1'))
    assert hblockchain.add_block(block_1) == True
    hblockchain.blockchain.clear()
def test_missing_version(monkeypatch):
    """
    test for a missing version number
    """
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg1'))
    monkeypatch.setitem(block_1, "version", "")
    assert hblockchain.add_block(block_1) == False
def test_version_bad(monkeypatch):
    """
    test for an unknown version number
    """
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg1'))
    monkeypatch.setitem(block_1, "version", -1)
    assert hblockchain.add_block(block_1) == False
def test_bad_timestamp_type(monkeypatch):
    """
    test for a bad timestamp type
    """
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg1'))
    monkeypatch.setitem(block_1, "timestamp", "12345")
    assert hblockchain.add_block(block_1) == False
def test_negative_timestamp(monkeypatch):
    """
    test for a negative timestamp
    """
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg0'))
    monkeypatch.setitem(block_0, "timestamp", -2)
    assert hblockchain.add_block(block_0) == False
def test_missing_timestamp(monkeypatch):
    """
    test for a missing timestamp
    """
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg1'))
    monkeypatch.setitem(block_1, "timestamp", "")
    assert hblockchain.add_block(block_1) == False
def test_block_height_type(monkeypatch):
    """
    test the type of the block height parameter
    """
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg0'))
    monkeypatch.setitem(block_0, "height", "0")
    hblockchain.blockchain.clear()
    assert hblockchain.add_block(block_0) == False
def test_bad_nonce(monkeypatch):
    """
    test for a negative nonce
    """
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg1'))
    monkeypatch.setitem(block_1, "nonce", -1)
    assert hblockchain.add_block(block_1) == False
def test_missing_nonce(monkeypatch):
    """
    test for a missing nonce
    """
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg0'))
    monkeypatch.setitem(block_0, "nonce", "")
    assert hblockchain.add_block(block_0) == False
def test_block_nonce_type(monkeypatch):
    """
    test nonce has the wrong type"
    """
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg0'))
    monkeypatch.setitem(block_0, "nonce", "0")
    assert hblockchain.add_block(block_0) == False
def test_negative_difficulty_bit(monkeypatch):
    """
    test for negative difficulty bits
    """
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg1'))
    monkeypatch.setitem(block_1, "difficulty_bits", -5)
    assert hblockchain.add_block(block_1) == False
def test_difficulty_type(monkeypatch):
    """
    test difficulty bits has the wrong type"
    """
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg0'))
    monkeypatch.setitem(block_0, "difficulty_bits", "20")
    assert hblockchain.add_block(block_0) == False
def test_missing_difficulty_bit(monkeypatch):
    """
    test for missing difficulty bits
    """
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('data'))
    monkeypatch.setitem(block_1, "difficulty_bits", '')
    assert hblockchain.add_block(block_1) == False
def test_read_genesis_block(monkeypatch):
    """
    test reading the genesis block from the blockchain
    """
    hblockchain.blockchain.clear()
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg0'))
    hblockchain.add_block(block_0)
    assert hblockchain.read_block(0) == block_0
    hblockchain.blockchain.clear()
def test_genesis_block_height(monkeypatch):
    """
    test genesis block height
    """
    hblockchain.blockchain.clear()
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg0'))
    block_0["height"] = 0
    assert hblockchain.add_block(block_0) == True
    blk = hblockchain.read_block(0)
    assert blk != False
    assert blk["height"] == 0
    hblockchain.blockchain.clear()
def test_read_second_block(monkeypatch):
    """
    test reading the second block from the blockchain
    """
    hblockchain.blockchain.clear()
    assert len(hblockchain.blockchain) == 0
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg0'))
    monkeypatch.setitem(block_1, "prevblockhash", hblockchain.blockheader_hash(block_0))
    ret = hblockchain.add_block(block_0)
    assert ret == True
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg1'))
    ret = hblockchain.add_block(block_1)
    assert ret == True
    block = hblockchain.read_block(1)
    assert block != False
    hblockchain.blockchain.clear()
def test_block_height(monkeypatch):
    """
    test height of the the second block
    """
    hblockchain.blockchain.clear()
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg0'))
    monkeypatch.setitem(block_0, "height", 0)
    monkeypatch.setitem(block_0, "prevblockhash", "")
    monkeypatch.setitem(block_1, "height", 1)
    monkeypatch.setitem(block_1, "prevblockhash", hblockchain.blockheader_hash(block_0))
    assert hblockchain.add_block(block_0) == True
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg1'))
    assert hblockchain.add_block(block_1) == True
    blk = hblockchain.read_block(1)
    assert blk != False
    assert blk["height"] == 1
    hblockchain.blockchain.clear()
def test_block_size(monkeypatch):
    """
    The block size must be less than hconfig["MAX_BLOCKS"]
    """
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg0'))
    arry = []
    filler = "0" * 2000000
    arry.append(filler)
    monkeypatch.setitem(block_0, "tx", arry)
    hblockchain.blockchain.clear()
    assert hblockchain.add_block(block_0) == False
def test_genesis_block_prev_hash(monkeypatch):
    """
    test that the previous block hash for the genesis block is empty
    """
    hblockchain.blockchain.clear()
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg0'))
    monkeypatch.setitem(block_0, "height", 0)
    monkeypatch.setitem(block_0, "prevblockhash", rcrypt.make_uuid() )
    assert len(hblockchain.blockchain) == 0
    assert hblockchain.add_block(block_0) == False
def test_computes_previous_block_hash(monkeypatch):
    """
    test previous block hash has correct format
    """
    val = hblockchain.blockheader_hash(block_0)
    rcrypt.validate_SHA256_hash(val) == True
def test_invalid_previous_hash(monkeypatch):
    """
    test block's prevblockhash is invalid
    """
    hblockchain.blockchain.clear()
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \  rcrypt.make_SHA256_hash('msg0'))
    monkeypatch.setitem(block_2, "prevblockhash", \
        "188a1fd32a1f83af966b31ca781d71c40f756a3dc2a7ac44ce89734d2186f632")
    hblockchain.blockchain.clear()
    assert hblockchain.add_block(block_0) == True
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg1'))
    assert hblockchain.add_block(block_1) == True
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg2'))
    assert hblockchain.add_block(block_2) == False
    hblockchain.blockchain.clear()
def test_no_consecutive_duplicate_blocks(monkeypatch):
    """
    test cannot add the same block twice consecutively to the blockchain
    """
    hblockchain.blockchain.clear()
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg0'))
    assert hblockchain.add_block(block_0) == True
    monkeypatch.setattr(hblockchain, "merkle_root", lambda x, y: \ rcrypt.make_SHA256_hash('msg1'))
    monkeypatch.setitem(block_1, "prevblockhash", hblockchain.blockheader_hash(block_0))
    assert hblockchain.add_block(block_1) == True
    monkeypatch.setitem(block_1, "height", 2)
    assert hblockchain.add_block(block_1) == False
    hblockchain.blockchain.clear()

Conclusion

In this chapter, we have developed the rcrypt module that encapsulates the cryptographic functions that are used by Helium. We also wrote unit tests for our cryptographic module. We then wrote program code for the Helium blockchain. The module hblockchain encapsulates Helium blockchain functionality. Finally, we wrote some unit tests to validate the blockchain module.

In the next chapter, we turn our attention to the processing of cryptocurrency transactions.