Local communication using the net module

While many applications can run on a single thread and utilize the event loop to run, when we are writing server applications we will want to try and utilize all of the cores that we have available to us. We can do this through the use of processes or threads. In most cases, we are going to want to use threads since they are lighter and faster to start.

We can find out whether we need a process or a thread based on whether we need to have the subsystem still running if the main system dies. If we don't care, we should utilize a thread, but if we need to have that subsystem still running even after the main process dies, we should utilize a decoupled process. This is only one way of thinking about when to use a process or a thread, but it is a good indicator.

In both the browser and Node.js, we have web workers that take the place of threads in traditional systems. While they have many of the same concepts as the threads of other languages, they are unable to share state (in our case, this is preferred).

There's a way to share state between workers. This can be done through SharedArrayBuffer. While we can utilize this to share state, we want to highlight that the event system and IPC are almost always fast enough to move state and coordinate between different pieces. Also, we don't have to deal with concepts such as locks.

To start up a worker, we need to call new Worker(<script here>). Let's go over this concept:

  1. Create a file called Main_Worker.js and add the following code to it:
import { Worker } from 'worker_threads';

const data = {what: 'this', huh: 'yeah'};
const worker = new Worker('./worker.js');
worker.postMessage(data);
worker.on('message', (data) => {
worker.terminate();
});
worker.on('exit', (code) => {
console.log('our worker stopped with the following code: ',
code);
});
  1. Create a file called worker.js and add the following code to it:
import { parentPort } from 'worker_threads'

parentPort.on('message', (msg) => {
console.log('we received a message from our parent: ', msg);
parentPort.postMessage({RECEIVED: true});
});

As we can see, this system is similar to the one in the browser. First, we import the worker from the worker_threads module. Then, we start it up. The thread will start, which means we post messages to it and listen for events, similar to the way we were able to with processes in the previous chapter.

Inside of the worker.js file, we import the parentPort message channels from the worker_threads module. We listen and pass messages the same way as the parent does. Once we receive a message, we state that we received the message. The parent then terminates us and we print out that we have been terminated.

Now, this form of message passing is perfectly fine if we want to tightly couple all of our subsystems together. But what if we want different threads to have different jobs? We could have one that just caches data for us. Another one could potentially make requests for us. Finally, our main thread (the starting process) can move all of this data and take in data from the command line.

To do all of this, we could simply use the built-in system. Alternatively, we could utilize the mechanism that we looked at in the previous chapter. Not only does this give us a highly scalable system, but it also allows us to change these various subsystems from threads into processes if we need to. This also allows us to write these separate subsystems in another language, if needed. Let's go over this now:

  1. Let's go ahead and make this system. We are going to create four files: main.js, cache.js, send.js, and package.json. Our package.json file should look something like this:
{
"name" : "Chapter6_Local",
"version" : "0.0.1",
"type" : "module",
"main" : "main.js"
}
  1. Next, add the following code to the cache.js file:
import net from 'net';
import pipeName from './helper.js';

let count = 0;
let cacheTable = new Map();
// these correspond to !!!BEGIN!!!, !!!END!!!, !!!GET!!!, and
// !!!DELETE!!! respectively
const begin, end, get, del; //shortened for readability they will use the Buffer.from() methods
let currData = [];

const socket = new net.Socket().connect(pipeName());
socket.on('data', (data) => {
if( data.toString('utf8') === 'WHOIS' ) {
return socket.write('cache');
}
if( data.includes(get) ) {
const loc = parseInt(data.slice(get.byteLength).toString('utf8'));
const d = cacheTable.get(loc);
if( typeof d !== 'undefined' ) {
socket.write(begin.toString('utf8') + d +
end.toString('utf8'));
}
}
if( data.includes(del) ) {
if( data.byteLength === del.byteLength ) {
cacheTable.clear();
} else {
const loc = parseInt(data.slice(del.byteLength).toString('utf8'));
cacheTable.delete(loc);
}
}
if( data.includes(begin) ) {
currData.push(data.slice(begin.byteLength).toString('utf8'));
}
if( currData.length ) {
currData.push(data.toString('utf8'));
}
if( data.includes(end) ) {
currData.push(data.slice(0, data.byteLength -
end.byteLength).toString('utf8'));
cacheTable.set(count, currData.join(''));
currData = [];
}
});
This is definitely not a foolproof mechanism for handling streaming data. !!!BEGIN!!! and other command messages could be chunked and we would never see them. While we are keeping this simple, remember that production-level streaming needs to handle all of these types of issues.

The cache submodule checks for different headers on the message. Depending on each type, we will do that type of action. This can be thought of as a simple way to do remote procedure calls. The following list describes what we do, depending on each event:

  1. Add the following code to the send.js file:
import net from 'net'
import https from 'https'
import pipeName from './helpers.js'

const socket = new net.Socket().connect(pipeName());
socket.on('data', (data) => {
if( data.toString('utf8') === 'WHOIS' ) {
return socket.write('send');
}
const all = [];
https.get(data.toString('utf8'), (res) => {
res.on('data', (data) => {
all.push(data.toString('utf8'));
});
res.on('end', () => {
socket.write('!!!BEGIN!!!');
socket.write(all.join(''));
socket.write('!!!END!!!');
});
}).on('error', (err) => {
socket.write('!!!FALSE!!!');
});
console.log('we received data from the main application',
data.toString('utf8'));
});

For each of the submodules that we have, we handle specific commands that may come across the wire. As shown by the send submodule, we handle anything on the wire other than the WHOIS command, which tells the main application who is connected to it. We try to grab the file from the specified address and write it back to the main application so that it's stored in the cache.

We also added our own protocol to send the data. While this system is not foolproof and we should add some type of locking (such as a Boolean, so that we don't try to take in any more data before fully sending out the current data), it does showcase how we can send data across our system. In Chapter 7, Streams - Understanding Streams and Non-Blocking I/O, we will look at a similar concept, but we will utilize streams so that we don't use so much memory per thread.

As we can see, we're only importing the https module. This means that we are only allowed to make requests to addresses that are served over HTTPS. If we wanted to support HTTP, we would have to import the http module and then check the address that the user types in. In our case, we made it as simple as possible.

When we want to send data, we send the !!!BEGIN!!! message to let the receiver know that we are about to send data that will not fit into a single frame. Then, we end our message with the !!!END!!! message.

If we can't read the endpoint that we are trying to grab or our connection times out (both of these will drop into the error condition), we will send a !!!FALSE!!! message to let the receiver know that we are unable to fully transmit the data.

This concept of wrapping our data in frames is used in almost all data transmission systems. Without framing, we would have to send a header that says how large the data transmission is. However that would mean we need to know the size of the content before we send it. Framing gives us the option of not sending the length of messages, so we can process infinitely large messages.

Framing or even boxing the data is done everywhere. If we were to look at how packets are created, for example, the concept still applies. Understanding this concept is key to understanding lower levels of the communication stack. Another concept that is good to know about is that not all of this data is sent at once. It is sent in pieces. The amount that can be sent at one time is usually set at the operating system level. One of the only properties that we can set is the highWaterMark property on streams. This property allows us to say how much data we will hold in memory before we stop reading/writing.

The cache application acts similar to the send submodule, except it responds to more commands. If we get a get command, we can try and grab that item from the cache and send it back to the main module; otherwise, we just send back null. If we get a delete command, we will delete the entire cache if we get no other arguments; otherwise, we delete the item at that specific location. Finally, if we get the beginning or ending wrappers, we will process the data and cache it.

Currently, we infinitely increase our cache. We could easily add a concept of a certain time threshold that is allowed for data to stay in the cache (Time To Live or TTL) or only hold a certain number of records, usually by utilizing a Least Recently Used (LRU) destroy system. We will look at how to implement caching strategies in Chapter 9, Practical Example - Building a Static Server. Just note that these concepts are quite ubiquitous with caches and caching strategies.

Heading back into the code, create main.js and add the following code:

  1. Create placeholders for our state variables. These correspond to the various states that our messages could be in and the data that is passing through the socket:
// import required modules and methods
const table = new Map();
let currData = [];
// These three items correspond to the buffers for: !!!FALSE!!!,
// !!!BEGIN!!!, and !!!END!!! respectively
const failure, begin, end;
const methodTable = new WeakMap();
  1. Create the method to handle data that comes in through our cache:

const cacheHandler = function(data) {
if( data.includes(begin) || currData.length ) {
currData.push(data.toString('utf8'));
}
if( data.includes(end) ) {
currData.push(data.toString('utf8'));
const final = currData.join('');
console.log(final.substring(begin.byteLength,
final.byteLength - end.byteLength));
currData = [];
}
}
  1. Next, add the method that will handle the messages from ourĀ send worker:

const sendHandler = function(data) {
if( data.equals(failure) ) { //failure }
if( data.includes(begin) ) {
currData.push(data.toString('utf8')); }
if( currData.length ) { currData.push(data.toString('utf8')); }
if( data.includes(end) ) {
table.get('cache').write(currData.join(''));
currData = [];
}
}
  1. Create two final helper methods. These will test the number of workers we have to know when we are ready to start and the other will add the method handlers to each worker socket:

const testConnections = function() {
return table.size === 2;
}
const setupHandler = function() {
table.forEach((value, key) => {
value.on('data', methodTable.get(value.bind(value));
});
}
  1. The final large method will handle all of the messages that we receive via the command-line:
const startCLIMode = function() {
process.stdin.on('data', function(data) {
const d = data.toString('utf8');
const instruction = d.trim().split(/\s/ig);
switch(instruction[0]) {
case 'delete': {
table.get('cache').write(`!!!DELETE!!!${instruction[1] || ''}`);
break; }
case 'get': {
if( typeof instruction[1] === 'undefined' ) {
return console.log('get needs a number
associated with it!');
}
table.get('cache').write(`!!!GET!!!${instruction[1]}`);
break; }
case 'grab': {
table.get('send').write(instruction[1]);
break; }
case 'stop': {
table.forEach((value, key) => value.end());
process.exit();
break; }
}});
}
  1. Finally, create the server and start the workers:
const server = net.createServer().listen(pipeName());
server.on('connection', (socket) => {
socket.once('data', (data) => {
const type = data.toString('utf8');
table.set(type, socket);
if( testConnections() ) {
setupHandlers();
startCLIMode();
}
});
socket.once('close', () => {
table.delete(type);
});
socket.write('WHOIS');
});

const cache = new Worker('./cache.js');
const send = new Worker('./send.js');

Certain parts of the main file have been removed to shorten the amount of code in this book. The full example can be found in this book's GitHub repository.

Here, we have a bunch of helpers that will handle messages from the cache and send subsystems. We also map the socket to our handler. The utilization of a WeakMap means that we don't need to clean up if these subsystems ever crash or are somehow removed. We also map the name of the subsystem to the socket so that we can easily send messages to the correct subsystem. Finally, we create a server and handle the incoming connections. In our case, we only want to check for two subsystems. Once we can see two, we start our program.

There are some flaws in the way we wrap our messages, and testing the number of connections to see whether we are ready is also not the best way to handle our program. However, this does allow us to create a somewhat complex application so that we can quickly test the ideas that can be seen here. With this application, we are now able to cache various files from a remote resource and grab them when we want them. This is a system that is similar to how some static servers work.

By looking at the preceding application, it is easy to see how we can utilize local connections to create a message-passing system with only the core Node.js system. What is also interesting is that we can replace the listen method's argument from a pipe name with a port number and we would be able to turn this application from utilizing named pipes/Unix domain sockets to utilizing TCP sockets.

Before we had these worker threads inside of Node.js, we had to separate everything out with processes. In the beginning, we only had the fork system. This made some systems quite complex when we started creating more processes. To help us with this concept, the cluster module was created. With the cluster module, it's easier to manage processes in a master/slave architecture.