CHAPTER-2
Async
Node.js was created in reaction to slow web servers in Ruby and other dynamic languages at that time.
These servers were slow because they were only capable of handling a single request at a time. Any work that involved I/O (e.g. network or file system access) was “blocking”. The program would not be able to perform any work while waiting on these blocking resources.
Node.js is able to handle many requests concurrently because it is non-blocking by default. Node.js can continue to perform work while waiting on slow resources.
The simplest, and most common form of asynchronous execution within Node.js is the callback. A callback is a way to specify that “after X happens, do Y”. Typically, “X” will be some form of slow I/O (e.g. reading a file), and “Y” will be work that incorporates the result (e.g. processing data from that file).
If you’re familiar with JavaScript in the browser, you’ll find a similar pattern all over the place. For example:
window.addEventListener('resize', () -› console.log('window has been resized! '))
To translate this back into words: “After the window is resized, print ‘window has been resized!”’
Here’s another example: (lang=js,line-numbers=off}
setTimeout(() console.log(‘hello from the past ), 5000) “After 5 seconds, print ‘hello from the past”’ This can be confusing for most people the first time they encounter it. This is understandable because it requires thinking about multiple points in time at once.
In other languages, we expect work to be performed in the order it is written in the file. However in JavaScript, we can make the following lines print in the reverse order from how they are written:
const tenYears - 10 * 365 * 24 * 60 * 60 * 1000
setTimeout(() =› console. log( ' hello from the past' ), tenYears)
console.log('hello from the present')
If were were to run the above code, you would immediately see “hello from the present’’ and 10 years later, you would see “hello from the past’’
Importantly, because setTimeout( ) is non-blocking, we don’t need to wait 10 years to print “hello from the present” — it happens immediately after.
Let’s take a closer look at this non-blocking behavior. We’ll set up both an interval and a timeout. Our interval will print the running time of our script in seconds, and our timeout will print “hello from the past” after 5.5 seconds (and then exit so that we don’t count forever):
let count = 0
setInterval(() =› console.log(’${++count} mississippi’), 1000)
setTimeout( ( ) =› (
console . log( ' hello from the past ! ' )
process . exit( )
) , 5500)
process‘° is a globally available object in Node.js. We don’t need to use require( ) to access it. In addition to providing us the process . exit( )“ method, it’s also useful for getting command-line arguments with process . argv‘2 and environment variables with process . env‘°. We’ll cover these and more in later chapters.
If we run this with node 81 – set – timeout . j s we should expect to see something like this:
Our script dutifully counts each second, until our timeout function executes after 5.5 seconds, printing “hello from the past!” and exiting the script.
Let’s compare this to what would happen if instead of using a non-blocking setTimeout( ), we use a blocking setTimeoutsync( ) function:
let count = 0
setInterval(() =› console.log(’${++count} mississippi ), 1000)
setTimeoutSync(5500)
console.log('hello from the past! ')
process.exit()
function setTimeoutSync (ms) (
const TO = Date . now( )
while (Date.now() to ms) )}
We’ve created our own setTimeoutsync( ) function that will block execution for the specified number of milliseconds. This will behave more similarly to other blocking languages. However, if we run it, we’ll see a problem:
What happened to our counting?
In our previous example, Node.js was able to perform two sets of instructions concurrently. While we were waiting on the “hello from the past!” message, we were seeing the seconds get counted. However, in this example, Node.js is blocked and is never able to count the seconds.
Node.js is non-blocking by default, but as we can see, it’s still possible to block. Node.js is single- threaded, so long running loops like the one in setTimeoutsync( ) will prevent other work from being performed (e.g. our interval function to count the seconds). In fact, if we were to use setTimeoutsync( ) in our API server in chapter 1, our server would not be able to respond to any browser requests while that function is active!
In this example, our long-running loop is intentional, but in the future we we’ll be careful not to unintentionally create blocking behavior like this. Node.js is powerful because of its ability to handle many requests concurrently, but it’s unable to do that when blocked.
Of course, this works the same with JavaScript in the browser. The reason why async functions like setTimeout( ) exist is so that we don’t block the execution loop and freeze the UI. Our setTimeoutsync( ) function would be equally problematic in a browser environment.
What we’re really talking about here is having the ability to perform tasks on different timelines. We’ll want to perform some tasks sequentially and others concurrently. Some tasks should be performed immediately, and others should be performed only after some criteria has been met in the future.
JavaScript and Node.js may seem strange because they try not to block by running everything sequentially in a single timeline. However, we’ll see that this is gives us a lot of power to efficiently program tasks involving multiple timelines.
In the next sections we’ll cover some different ways that Node.js allows us to do this using asynchronous execution. Callback functions like the one seen in setTimeout( ) are the most common and straightforward, but we also have other techniques. These include promises, async/await, event emitters, and streams.
Callbacks
Node.js-style callbacks are very similar to how we would perform asynchronous execution in the browser, and are just an sleight variation on our setTimeout( ) example above.
Interacting with the filesystem is extremely slow relative to interacting with system memory or the CPU. This slowness makes it conceptually similar to setTimeout( ).
While loading a small file may only take two milliseconds to complete, that’s still a really long time — enough to do over 10,000 math operations. Node.js provides us asynchronous methods to perform these tasks so that our applications can continue to perform operations while waiting on I/O and other slow tasks.
Here’s what it looks like to read a file using the core fs“ module:
The core fs module has methods that allow us to interact with the filesystem. Most often we’ll use this to read and write to files, get file information such as size and modified time, and see directory listings. In fact, we’ve already used it in the first chapter to send static files to the browser.
const fs - require('fs')
const filename - '03-read-file-callback.js'
fs.readFile(filename, (err, fileData) => {
i:f ( err ) return conso1 e . error(err )
console log(’${filename}: ${fileData length} )
From this example we can see that fs . readFile( ) expects two arguments, filename and call back:
fs.readFile(filename, callback)
setTimeout( ) also expects two arguments, callback and delay:
setTimeout(callback, delay)
This difference in ordering highlights an important Node.js convention. In Node.js official APIs (and most third-party libraries) the callback is always the last argument.
The second thing to notice is the order of the arguments in the callback itself:
fs.readFile(filename, (err, fileData) =› {
Here we provide an anonymous function as the callback to fs . readFile( ), and our anonymous function accepts two arguments: err and fiIeData. This shows off another important Node.js convention: the error (or null if no error occurred) is the first argument when executing a provided callback.
This convention signifies the importance of error handling in Node.js. The error is the first argument because we are expected to check and handle the error first before moving on.
In this example, that means first checking to see if the error exists and if so, printing it out with console . error( ) and skipping the rest of our function by returning early. Only if err is false, do we print the filename and file size:
fs.readFile(filename, (err, fileData) =› (
file ( err ) return console . error (err )
console log(’${filename}: ${fileData length}’)
Run this file with node 83- read – fiIe – call back . js and you should see output like:
To trigger our error handling, change the filename to something that doesn’t exist and you’ll see something like this:
If you were to comment out our line for error handling, you would see a different error: TypeError: Cannot read property ‘ length ‘ of underIined. Unlike the error above, this would would crash our script.
TODO: fs . readFilesync ( )
We now know how basic async operations work in Node.js. If instead of reading a file, we wanted to get a directory list, it would work similarly. We would call fs . readd ir( ), and because of Node.js convention, we could guess that the first argument is the directory path and the last argument is a callback. Furthermore, we know the callback that we pass should expect error as the first argument, and the directory listing as the second argument.
fs.readdir(directoryPath, (err, fileList) =>
file ( err) return console . error( err )
console. log(fileList)
If you’re wondering why fs . readFile( ) and fs . readdir( ) are capitalized differently, it’s because readdir is a system call“, and fs . readdir( ) follows its C naming convention. fs . readFile( ) and most other methods are higher-level wrappers and conform to the typical JavaScript camelCase convention.
Let’s now move beyond using single async methods in isolation. In a typical real-world app, we’ll need to use multiple async calls together, where we use the output from one as the input for others.
Async in Series and Parallel
In the previous section, we learned how to perform asynchronous actions in series by using a callback to wait until an asynchronous action has completed. In this section, we’ll not only perform asynchronous actions in series, but we will also perform a group of actions in parallel.
Now that we know how to read files and directories. Let’s combine these to so that we can first get a directory list, and then read each file on that list. In short, our program will:
- Get a directory list.
- For each item on that list, print the file’s name and size (in the same alphabetical order as the list).
- Once all names and sizes have been printed, print “done!”
We might be tempted to write something like this:
const fs - require('fs')
fs . readdir( ' . / ' , ( err , files ) =› {
file ( err ) return console . error ( err )
files.forEach( function ( file) (
fs.readFile(file, (err, fileData) =› (
if (err) return console.error(err)
console. log( ${file}: ${fileData. length} )
console. log( 'done! ')
If we run node 85 – read – dir – callbacks – broken . js, we’ll see some problems with this approach.
Let’s look at the output:
Two problems jump out at us. First, we can see that “done!” is printed before all of our files, and second, our files are not printed in the alphabetical order that fs . readdir( ) returns them.
Both of these problems stem from the following lines:
files. forEach( function ( file) (
fs.readFile(file, (err, fileData) =› (
if (err) return console.error(err)
console. log( ${file} : ${fileData. length} ’)
console. log( 'done! ')
If we were to run the following, we would have the same issue:
const seconds - [5, 2]
seconds.forEach(s =› (
setTimeout(() -› console.log(’Waited ${s} seconds'), 1000 * s)
console. log('done! ')
Just like setTimeout( ) , fs . readFile( ) does not block execution. Therefore, Node.js is not going to wait for the file data to be read before printing “done!” Additionally, even though 5 is first in the seconds array, “Waited 5 seconds” will be printed last. In this example it’s obvious because 5 seconds is a longer amount of time than 2 seconds, but the same issue happens with fs . readFiIe( ) ; it takes less time to read some files than others.
If we can’t use Array . for Each( ) to do what we want, what can we do? The answer is to create our own async iterator. Just like how there can be synchronous variants of asynchronous functions, we can make async variants of synchronous functions. Let’s take a look at an async version of Array.map()
function mapAsync (arr, fn, onFinish) (
let prevError
let nRemaining - arr.length
const results - []
arr. forEach( function (item, i) (
fn(item, function (err, data) (
if (prevError) return
if (err) (
prevError = err
return onFinish(err)
results[i] - data
nRemaining--
if (!nRemaining) onFinish(null, results)
If this function looks confusing, that’s OK; it’s abstract so that it can accept an arbitrary array (arr) and iterator function (fn), and it takes advantage of Node.js conventions to make things work. We’ll go through it piece by piece to make sure everything is clear.
For comparison, let’s look at a simple synchronous version of Array . map( ) :
function mapsync (arr, fn) {
const results = []
arr.forEach( function (item, i)
const data - fn(item)
results [ i ] = data
})
return results
At a high level, our mapAsync( ) is very similar to mapsync( ), but it needs a little extra functionality to make async work. The main additions are that it (1) keeps track of how many items from the array (arr) have been processed, (2) whether or not there’s been an error, and (3) a final callback (onFinish) to run after all items have been successfully processed or an error occurs.
Just like oapsync( ), the first two arguments of mapAsync( ) are arr, an array of items, and fn, a function to be executed for each item in the array. Unlike mapsync( ), mapAsync( ) expects fn to be an asynchronous function. This means that when we execute fn( ) for an item in the array, it won’t immediately return a result; the result will be passed to a callback.
Therefore, instead of being able to synchronously assign values to the resu Its array in mapsync( ) :
arr.forEach(function (item, i) (
const data – fn(item)
results[i] = data
We need assign values to the results within the callback of fn( ) in map sync( ) :
arr.forEach( function (item, i) (
fn(item, function (err, data) (
results[i] = data
This means that when using mapAsync( ) , we expect the given iterator function, fn( ) , to follow Node.js convention by accepting an i tern from arr as its first argument and a callback as the last argument. This ensures that any function following Node.js convention can be used. For example, fs . readFiIe( ) could be given as the fn argument to mapAsync( ) because it can be called with the form: fs . read File( filename , callback ) .
Because the fn( ) callback for each item will execute in a different order than the items array (just like our setTimeout( ) example above), we need a way to know when we are finished processing all items in the array. In our synchronous version, we know that we’re finished when the last item is processed, but that doesn’t work for mapAsync( ).
To solve this problem, we need to keep track of how many items have been completed successfully. By keeping track, we can make sure that we only call onFinish( ) after all items have been completed successfully.
Specifically, within the fn( ) callback, after we assign value to our results array, we check to see if we have processed all items. If we have, we call onFinish( ) with the results, and if we haven’t we do nothing.
function mapAsync (arr, fn, onFinish) {
let nRemaining - arr.length
const results [ ]
arr. forEach( function (item, i) (
fn(item, function (err, data) (
results[i] = data
nRemaining--
if (!nRemaining) onFinish(null, results)
At this point, after everything goes well, we’ll call onFinish( ) with a correctly ordered results array. Of course, as we know, everything does not always go well, and we need to know when it doesn’t.
Node.js convention is to call callbacks with an error as the first argument if one exists. In the above example, if we call mapAsync( ) and any of the items has an error processing, we’ll never know. For example, if we were to use it with fs . readFile( ) on files that don’t exist:
mapAsync([' filed.js', ' file2.js'], fs.readFile, (err, filesData) =› {
if (err) return console.error(err)
console.log(filesData)
Our output would be [underIined , underIined ] . This is because err is nu 11, and without proper error handling, our mapAsync( ) function will push underIined values into the resuIts array. We’ve received a faulty results array instead of an error. This is the opposite of what we want; we want to follow Node.js convention so that we receive an error instead of the results array.
If any of our items has an error, we’ll call onFinish(err ) instead of onFinish( null, resuIts ) . Because onFinish( ) will only be called with the results after all items have finished successfully, we can avoid that with an early return:
function mapAsync (arr, fn, onFinish) {
let nRemaining = arr.length
const results - []
arr.forEach( function (item, i) (
fn(item, function (err, data) (
if (err) return onFinish(err)
results[i] - data
nRemaining--
if (!nRemaining) onFinish(null, results)
})
Now, if we run into an error, we’ll immediately call onFinish(err ) . In addition, because we don’t decrease our nRemaining count for the item with an error, nRemaining never reaches 8 and onFinish( null, results ) is never called.
Unfortunately, this opens us up to another problem. If we have multiple errors, onFinish(err) will be called multiple times; onFinish( ) is expected to be called only once.
Preventing this is simple. We can keep track of whether or not we’ve encountered an error already. Once we’ve already encountered an error and called onFinish(err), we know that (A) if we encounter another error, we should call onFinish(err) again, and (B) even if we don’t encounter another error, we’ll never have a complete set of results. Therefore, there’s nothing left to do, and we can stop:
function mapAsync (arr, fn, onFinish) {
let prevError
let nRemaining - arr length
const results = []
arr.forEach( function (item, i)
fn(item, function (err, data) (
if (prevError) return
i I ( err ) (
prevError - err
return onFinish(err)
results[i] - data
nRemaining--
if (!nRemaining) onFinish(null, results)
})
Now that we’ve added proper error handling, we have a useful asynchronous map( ) function that we can use any time we want to perform a task concurrently for an array of items. By taking advantage of and following Node.js convention, our mapAsync( ) is directly compatible with most core Node.js API methods and third-party modules.
Here’s how we can use it for our directory listing challenge:
fs . readdir ( ' . / ' , funct ion ( err , files ) {
i:f ( err ) return console . error(err )
mapAsync( files, is . readFile, ( err, results) =› (
i I ( err ) return console . error( err )
results. forEach((data, i) =› console. log(’${files[i]}: ${data. length} ’))
console. log( 'done! ')
})
Creating A Function
Now that we have a general-purpose async map and we’ve used it to get a directory listing and read each file in the directory, let’s create a general-purpose function that can perform this task for any specified directory.
To achieve this we’ll create a new function getFiIeLengths ( ) that accepts two arguments, a directory and a callback. This function will first call fs . readdir( ) using the provided directory, then it pass the file list and fs . readFile to mapAsync( ) , and once that is all finished, it will call the callback with an error (if one exists) or the results array — pairs of filenames and lengths.
Once we’ve created getFileLengths ( ) we’ll be able to use it like this:
const targetDirectory = process.argv[2] I I './'
getFileLengths(targetDirectory, function (err, results) (
file ( err ) return console . error (err )
results forEach(([file, length]) =› console log(’${file}: ${length}’))
console. log( 'done! ')
process . argv is an globally available array of the command line arguments used to start Node.js. If you were to run a script with the command node file . js, process . argv would be equal to [ ‘ node ‘ , ‘ file . j s ‘ ] . In this case we allow our file to be run like node 86c – read – dir – caIIbacks – cIi . js . . /another – d i rectory. In that case process . argv [2] will be . /another-directory.
To make this work we’ll define our new getFileLengths ( ) function:
function getFileLengths (dir, cb) (
fs.readdir(dir, [function (err, files) (
if (err) return cb(err)
const filePaths - files.map(file => path.join(dir, file))
mapAsync( filePaths, readFile, cb)
})
Unsurprisingly, the first thing we do is use fs . readdir( ) to get the directory listing. We also follow Node.js convention, and if there’s an error, we’ll return early, and call the callback cb with the error.
Next, we need perform a little extra work on our file list for this function to be generalizable to any specified directory. fs . readdir( ) will only return file names — it will not return the full directory paths. This was fine for our previous example, because we were getting the file list of . /, our current working directory, and fs . readFile( ) wouldn’t need the full path to read a file in the current working directory. However, if we want this function to work for other directories, we need to be sure to pass more than just the file name to fs . readFile( ) . We use the built-in path module (require( ‘ path ‘ )) to combine our specified directory with each file name to get an array of file paths.
After we have our file paths array, we pass it to our mapAsync( ) function along with a customized readFiIe( ) function. We’re not using fs . readFiIe( ) directly, because we need to alter its output a little bit.
It’s often useful to wrap an async function with your own to make small changes to expected inputs and/or outputs. Here’s what our customized readFiIe( ) function looks like:
function readFile (file, cb) {
fs.readFile(file, function (err, fileData) {
file ( err ) (
if (err.code --- 'EISDIR') return cb(null, [file, 0])
return cb(err)
cb(null, [file, fileData. length])
In our previous example, we could be sure that there were no subdirectories. When accepting an arbitrary directory, we can’t be so sure. Therefore, we might be calling fs . readFile( ) on a directory.
If this happens, fs . readFile( ) will give us an error. Instead of halting the whole process, we’ll treat directories as having a length of 0.
Additionally, we want our readFiIe( ) function to return an array of both the file path and the file length. If we were to use the stock fs . readFile( ) we would only get the data.
readFiIe( ) will be called once for each file path, and after they have all finished, the callback passed to mapAsync( ) will be called with an array of the results. In this case, the results will be an array of arrays. Each array within the results array will contain a file path and a length.
Looking back at where we call mapAsync( ) within our getFileLengths( ) definition, we can see that we’re taking the callback cb passed to getFileLengths ( ) and handing it directly to mapAsync( ) :
function getFileLengths (dir, cb) {
fs.readdir(dir, [unction (err, files)
i I ( err ) return cb(err )
const filePaths = files.map(file =› path.join(dir, file))
mapAsync(filePaths, readFile, cb)
This means that the results of mapAsync( ) will be the results of getFileLengths ( ) . It is functionally equivalent to:
mapAsync(filePaths, readFile, (err, results) => cb(err, results))
Here’s our full implementation of getFileLengths ( ) :
const fs - require(' fs')
const path = require('path')
const targetDirectory - process argv[2] || './'
getFileLengths(targetDirectory, function (err, results) (
if (err) return console.error(err)
results.forEach(([file, length]) =› console.log( ${file}: ${length}’))
console.log('done!')
function getFileLengths (dir, cb) (
fs.readdir(dir, function (err, files) {
if (err) return cb(err)
const filePaths - files.map(file => path.join(dir, file))
mapAsync( filePaths, readFile, cb)
funct ion readFile ( file , cb ) (
Is . readFiIe( file , function (err, fileData ) (
file ( err ) {
if (err.code ==- 'EISDIR') return cb(null, [file, 0])
return cb(err)
cb(null, [file, fileData. length])
function mapAsync (arr, fn, onFinish) {
let prevError
let nRemaining = arr. length
const results - []
arr. forEach( function (item, i) (
fn(item, function (err, data) (
if (prevError) return
it ( err ) (
prevError = err
return onFinish( err )
results[i] = data
nRemaining--
if (!nRemaining) onFinish(null, results)
Wrapping Up
Callbacks can be quite confusing at first because how different they can be from working with other languages. However, they are a powerful convention that allows us to create async variations of common synchronous tasks. Additionally, because of their ubiquity in Node.js core modules, it’s important to be comfortable with them.
That said, there are alternative forms of async that build on top of these concepts that many people find easier to work with. Next up, we’ll talk about promises.
Promises
A promise is an object that represents a future action and its result. This is in contrast to callbacks which are just conventions around how we use functions.
Let’s take a look to see how we can use As . readFiIe( ) with promises instead of callbacks:
const fs = require('fs').promises
const filename = '07-read-file-promises.js'
fs.readFile(filename)
.then(data -› console. log( ${filename} : ${data. length} ))
.catch(err =› console.error(err))
vs:
const fs = require('fs')
const filename - '03-read-file-callback.js'
fs.readFile(filename, (err, fileData ) › ( file( err ) return console . error (ear )
console. log( ${filename} : ${fileData. length} ’)
So far, the biggest difference is that the promise has separate methods for success and failure. Unlike callbacks that are a single function with both error and results arguments, promises have separate methods then( ) and catch( ) . If the action is successful, then( ) is called with the result. If not, catch( ) is called with an error.
However, let’s now replicate our previous challenge of reading a directory and printing all the file lengths, and we’ll begin to see how promises can be helpful.
Real World Promises
Just looking at the above example, we might be tempted to solve the challenge like this:
const fs - require('fs').promises
fs.readdir('./')
.catch(err =› console.error(err))
.then(files =› {
files.forEach(function (file) (
fs.readFile(file)
catch(err =› console error(err))
then(data =› console log(’${file}: ${data length}’))
console.log( ' done ! ' )
Unfortunately, when used this way, we’ll run into the same issue with promises as we did with callbacks. Within the files . for Each( ) iterator, our use of the fs.readFile( ) promise is non- blocking. This means that we’re going to see done ! printed to the terminal before any of the results, and we’re going get the results out of order.
To be able to perform multiple async actions concurrently, we’ll need to use Promise . aII( ). Promise. all( ) is globally available in Node.js, and it execute an array of promises at the same time. It’s conceptually similar to the mapAsync( ) function we built. After all promises have been completed, it will return with an array of results.
Here’s how we can use Promise . aII( ) to solve our challenge:
const fs = require('fs').promises
fs.readdir(' ./')
.then(fileList =>
Promise.all(
fileList.map( file -> fs.readFile( file).then(data -> [file, data. length]))
.then(results =› {
results.forEach(([file, length]) -› console.log( ${file}: ${length} ))
console. log( 'done! ')
catch(err =› console error(err))
After we receive the file list fileList from fs . readdir( ) we use fileL i st . map( ) to transform it into an array of promises. Once we have an array of promises, we use Promise . all ( ) to execute them all in parallel.
One thing to notice about our transformation is that we are not only transforming each file name into a fs.readFile( ) promise, but we are also customizing the result:
fileList.map( file => fs.readFile( file).then(data => [file, data. length]))
If we had left the transformation as:
fileList.map(file =› fs.readFile(file))
When Promise.aII( ) finishes, resuIts will simply be an array of file data. Without also having the file names, we no longer will know which files the lengths belong to. In order to keep each length properly labeled, we need to modify what each promise returns. We do this by adding . then( ) returning [ filedata.length ] .
Creating A Function
Now that we’ve solved the challenge for a single directory, we can create a generalized function that can be used for any directory. Once we’ve generalized it, we can use it like this:
const targetDirectory - process.argv[2] | | ' ./'
getFileLengths(targetDirectory)
.then(results =› {
results.forEach(([file, length]) -› console.log( ${file}: ${length} ))
console.log('done!')
.catch(err =› console error(err))
Our new getFiIeLengths( ) is similar to what we did in the previous section. First we read the directory list, and then we use fileL i st . map( ) to transform that list into an array of promises. However, just like our callback example, we need some extra logic to handle arbitrary directories. Before creating a promise to read a file, we use path . join( ) to combine the directory with the file name to create a usable file path.
function getFileLengths (dir) (
return fs.readdir(dir).then(fileList =› (
const readFiles - fileList.map(file -› (
const filePath = path.join(dir, file)
return readFile(filePath)
return Promise.all(readFiles)
Just like our callback example, we use a customized readFile( ) function so that we can both ensure that our final result array is made up of file path, file length pairs, and that subdirectories are correctly handled. Here’s what that looks like:
function readFile (filePath) (
return fs
.readFile( filePath)
.then(data => [filePath, data. length])
.catch(err => {
if (err.code --- 'EISDIR') return [filePath, 0]
thro.err
})
The promise version of readFile( ) behaves similarly, but the implementation is a little different. As mentioned above, one of the biggest differences between callbacks and promises is error handling. In contrast to callbacks, the success and error paths of callbacks are handled with separate functions.
When then ( ) is called, we can be certain that we have not run into an error. We will have access to the file’s data, and we can return a [filePath , data.length] pair as the result.
Our callback example was able to return early with a value if we encountered a particular error code (E ISD I R). With promises, we need to handle this differently.
With promises, errors flow into a the separate catch( ) function. We can intercept E I SDI R errors and prevent them from breaking the chain. If the error code is E I SDI R, we return with our modified result, [ filePath , 8] . By using return within a catch( ) , we prevent the error from propagating. To downstream code, it will look like the operation successful returned this result.
If any other error is thrown, we make sure not to return with a value. Instead, we re-throw the error. This will propagate the error down the chain — successive then( ) calls will be skipped, and the next catch( ) will be run instead.
Each call to readFile( ) will return a promise that results in a file path and length pair. Therefore when we use Promise.all( ) on an array of these promises, we will ultimately end up with an array of these pairs — our desired outcome.
Here’s what the full file looks like:
const fs - require('fs').promises
const path = require('path')
const targetDirectory = process.argv[2] || './'
getFileLengths(targetDirectory)
.then(results -› {
results. forEach(([file, length]) =› console. log( ${file}: ${length} ))
console. log( 'done! ')
. catch ( err =› console . error( err ) )
function getFileLengths (dir)
return fs.readdir(dir).then(fileList =›
const readFiles - fileList.map(file =>
const filePath - path.join(dir, file)
return readFile( filePath)
return Promise.all(readFiles)
function readFile (filePath) (
return fs
.readFile( filePath)
.then(data => [filePath, data. length])
. catch( err › (
file( err . code'' EISDIR ' ) return [filePath, 0]
throw err
WrappingUp
Promises give us new ways of handling sequential and parallel async tasks, and we can take advantage of chaining . then( ) and . catch( ) calls to compose a series of tasks into a single function.
Compared to callbacks, we can see that promises have two nice properties. First, we did not need to use our own mapAsync( ) function — Promi se . all( ) is globally available and handles that functionality for us. Second, errors are automatically propagated along the promise chain for us. When using callbacks, we need to check for errors and use early returns to manage this ourselves.
In the next section we’ll build on top of promises to show off the use of the async and await language features. These allow us to use promises as if they were synchronous.
TODO : rename custom readFile( ) to getFileLength( )
Async & Await
What if we could have the non-blocking performance of asynchronous code, but with the simplicity and familiarity of synchronous code? In this section we’ll show how we can get a bit of both.
The async and await keywords allow us to treat specific uses of promises as if they were synchronous. Here’s an example of using them to read data from a file:
const fs - require('fs').promises
printL ength( ' IO- read - file-await . j s ’ )
async function printLength (file) {
try
const data - await fs.readFile(file)
console. log( ${file}: ${data. length} )
} catch (err) (
console.error(err)
One cool thing about this is that we can use standard synchronous language features like try/catch. Even though fs . readFiIe( ) is a promise (and therefore asynchronous), we’re able to wrap it in a try/catch block for error handling — just like we would be able to do for synchronous code. We don’t need to use catch( ) for error handling.
In fact, we don’t need to use then( ) either. We can directly assign the result of the promise to a variable, and use it on the following line.
However, it’s important to note, that we can only do these things within special async functions. Because of these, when we declare our printLength( ) function we use this syntax:
async function printLength (file) { ... }
Once we do that, we are able to use the await keyword within. For the most part, await allows us to treat promises as synchronous. As seen above, we can use try/catch and variable assignment. Most importantly, even though our code will run as if these operations are synchronous, they won’t block other executing tasks.
In many cases this can be very helpful and can simplify our code, but there are still gotchas to be aware of. Let’s go through our directory reading challenge one last time and take a look.
Real World Async/Await
Just like before we’re going to first get a directory list, then get each file’s length, print those lengths, and after that’s all finished, we’ll print ‘done!’.
For those of us who are new to asynchronous programming in Node.js, it might have felt a bit complicated to perform these tasks using callbacks or promises. Now that we’ve seen async and await, it’s tempting to think that we’ll be able to handle this task in a much more straightforward way.
Let’s look at how we might try to solve this challenge with async and await:
const fs = require('fs').promises
printLengths( './')
async function printLengths (dir) {
const fileList - await fs.readdir(dir)
const results - fileList.map(
async file =› await fs.readFile(file).then(data =› [file, data.length])
results.forEach(result -› console.log(’${result[0]}: ${result[4]} ))
console.log('done!')
Unfortunately, this won’t work. If we run node 41 – read -dir – await- broken . js we’ll see something like this:
- B node I I – read-dir -await – broken . j s
- undefined : undefined
- undefined : undefined
- undefined : undefined
- undefined : undefined
- undefined : undefined
What happened to our file names and lengths? The problem is our use of fileList . map( ) . Even though we specify the iterator function as async so that we can use await each time we call fs . readFile( ) , we can’t use await on the call to fileList . map( ) itself. This means that Node.js will not wait for each promise within the iterator to complete before moving on. Our dataFiles array will not be an array of file data; it will be an array of promises.
When we iterate over our dataFiles array, we will print the length of each item. Instead of printing the length of a file, we’re printing the length of a promise — which is undefined.
Luckily, the fix is simple. In the last section we used Promise . all( ) to treat an array of promises as a single promise. Once we convert the array of promises to a single promise, we can use await as we expect. Here’s what that looks like:
const fs - require('fs').promises
printLengths('./')
async function printLengths (dir) {
const fileList = await fs.readdir(dir)
const results - await Promise.all(
fileList.map( file =› fs.readFile( file).then(data =› [file, data. length]))
)
results. forEach(([file, length]) =› console. log(’${file} : ${length} ))
console. log( 'done!')
Creating Async/Await Functions
Since printLengths( ) accepts a directory argument, it may look like we’ve already created a generalized solution to this problem. However, our solution has two issues. First, it is currently unable to properly handle subdirectories, and second, unlike our previous generalized solutions, our printLengths( ) function will not return the files and lengths — it will only print them.
Like we’ve done with our promise and callback examples, let’s create a generalized getFileLengths( ) function that can work on arbitrary directories and will return with an array of file and length pairs.
We need to keep printLengths ( ) because we can’t take advantage of await outside of an async function. However, within printLengths( ) we will call our new getFileLengths( ) function, and unlike before, we can take advantage of async and await to both simplify how our code looks and to use try/catch for error handling:
const targetDirectory = process.argv[2] | | ' ./'
printLengths(targetDirectory)
async function printLengths (dir) {
try (
const results = await getFileLengths(dir)
results. forEach(([file, length]) =› console. log(’${file} : ${length} ’))
console. log( 'done! ')
} catch (err) {
console.error(err)
Unlike our previous promise example of getFileLengths ( ) , we don’t need to use then( ) or catch( ) . Instead, we can use direct assignment for resuIts and try/catch for errors.
Let's take a look at our async /await version of getFileLengths( ) :
async function getFileLengths (dir) (
const fileList = await fs readdir(dir)
const readFiles = fileList.map(async file =› (
const filePath - path.join(dir, file)
return await readFile( filePath)
return await Promise.all(readFiles)
Like before, we can do direct assignment of fileList without using then( ) or catch( ) . Node.js will wait for fs . readdir( ) to finish before continuing. If there’s an error, Node.js will throw, and it will be caught in the try/catch block in printLengths ( ) .
Also, just like our callback and promise versions, we’re going to use a customized readFile( ) function so that we can handle subdirectories. Here’s what that looks like:
async function readFile (filePath) {
try
const data - await fs.readFile(filePath)
return [filePath, data. length]
} catch (err) (
if (err.code === 'EISDIR') return [filePath, 0]
throw err
We’re able to return a value from within our catch block. This means that we can return [ filePath , 8] if we encounter a directory. However, if we encounter any other error type, we can throw again. This will propagate the error onwards to any catch block surrounding the use of this function. This is conceptually similar to how we would selectively re-throw in the promises example.
Once readFile( ) has been called for each file in the fileL i st array, we’ll have an array of promises, readFiles — calling an async function will return a promise. We then return await Promise.all( readFiIes ) , this will be an array of results from each of the readFile( ) calls.
And that’s all we need. If there’s an issue in any of the readFile( ) calls within the Promise.all( ) , the error will propagate up to where we call getFileLengths (dir) in printLengths ( ) — which can be caught in the try/catch there.
Here’s the full generalized solution to the challenge:
Wrapping Up
We have now solved the same real-world challenge with three different techniques for handling asynchronous tasks. The biggest differences with async/await is being able to use a more synchronous coding style and try/catch for error handling.
It’s important to remember that under the hood, async/await is using promises. When we declare an async function, we’re really creating a promise. This can be seen most clearly with our use of Promise.all().
We’re now going to move beyond callbacks, promises, and async/await.Each one of these styles are focused on performing “one and done” asynchronous tasks. We’re now going to turn our attention to ways of handling types of repeated asynchronous work that are very common in Node.js: event emitters and streams.
Event Emitters
Event emitters are not new to Node.js. In fact, we’ve already used an example that’s common in the browser:
window.addEventListener('resize', () =› console.log('window has been resized!'))
It’s true that like callbacks and promises, adding event listeners allow us create logic around future timelines, but the big difference is that events are expected to repeat.
Callbacks and promises have an expectation that they will resolve once and only once. If you recall from our callback example, we needed to add extra logic to mapAsync( ) to ensure that multiple errors would not cause the callback to be called multiple times.
Event emitters, on the other hand, are designed for use-cases where we expect a variable number of actions in the future. If you recall from our chat example when we built the API, we used an event emitter to handle chat message events. Chat messages can happen repeatedly or not at all. Using an event emitter allows us to run a function each time one occurs.
We didn’t dive into the details of that event emitter, but let’s take a closer look now. We’ll create a command line interface where a user can type messages. We’ll use an event emitter to run a function each time the user enters in a full message and presses “return”.
Event Emitters: Getting Started
When we built our chat app, we created our own event emitter. Many core Node.js methods provide a callback interface, but many also provide an event emitter interface.
For our first example, we’re going to use the core module readline. Readline allows us to “watch” a source of data and listen to “line” events. For our use-case we’re going to watch process . std i n, a data source that will contain any text that a user types in while our Node.js app is running, and we’re going to receive message events any time the user presses “return.”
Each time we receive a message, we’re going to transform that message to all uppercase and print it out to the terminal.
Here’s how we can do that:
const readline = require('readline')
const rl = readline.createInterface({ input: process.stdin })
rl.on('line', line =› console.log(line.toUpperCase()))
If we run it, we can get something like the following:
The lower case text is our input, and the uppercase is printed out by our script.
Once we create an instance of readline, we can use its on( ) method to listen for particular types of events. For right now, we’re interested in the line event type.
readIine is a core Node.js module. You can see all other event types that are available in the official Node.js documentation“
Creating event handlers for specific event types is similar to using callbacks, except that we don’t receive an error argument. This is just like how it works in the browser when we use document . addEventListener( event =› { } ) . The event handler function does not expect an error argument.
In addition to basic logging, we can use event handlers to perform other work. In the next section, we’ll see how we can use our event emitter to provide another interface to the chat app we built in the first chapter.
Event Emitters: Going Further
In the first chapter we built a chat app. We could open two different browsers, and anything we typed into one window would appear in the other. We’re going to show that we can do this without a browser.
By making a small tweak to our readline example, we’ll be able to open our chat app in a browser window and send messages to it using our terminal:
const http - require('http')
const readline = require('readline')
const querystring - require('querystring')
const rl - readline.createInterface(( input: process.stdin })
rl.on('line', line -›
http.get(
http://localhost:433T/chat?${querystring.stringify( { message: line })}
For this to work, we need to make sure that our server from chapter 1 is running and listening on port 1337. You can do this by opening a new terminal tab, navigating to the chapter 1 code directory, and running node 07 – server . js. You should see a message saying Server listening on port 1337.
The code hasn’t changed much. We have only replaced our console . log( ) with a call to http . get( ). We use the same built-in http module that we used in chapter 1. However, this time we using http to create requests instead of responding to them.
To ensure that our messages are properly formatted and special characters are escaped, we also use the built-in querystring module.
We can now run our server from chapter 1, our browser window to the chat app, run node 15- ee – readIine- chat -send . js, and start typing messages into the terminal. Each message should appear in the open browser window:
What’s cool”about this is that it shows off how easily Node.js can be used to chain functionality together. By creating small pieces of functionality with well defined interfaces, it’s very easy to get new functionality for free.
When we built our chat app in chapter 1, we didn’t plan on wanting a CLI client. However, because the interface is straightforward, we were easily able to get that functionality.
In the next section we’ll take it even further. We’re going to create our own event emitter object, and not only will we be able to send messages, but we’ll be able to receive them as well.
Event Emitters: Creating Custom Emitters
Now that we’ve added the capability to send messages, we can also add some functionality to receive them.
The only thing we need to do is to make an HTTP request to our API and log the messages as they come in. In the previous section, we make HTTP requests to send messages, but we don’t do anything with the response. To handle response data from the HTTP request, we’ll need to use both a callback and an event emitter:
http.get( 'http://localhost:1337/sse', res =› (
res.on('data', data =› console.log(data.tostring()))
http . get( ) is interesting because it accepts a callback as its second argument — and the response argument (res) the callback receives is an event emitter.
What’s going on here is that after we make the request, we need to wait for the response. The response object doesn’t come with all of the data, instead we have to subscribe to the ”data” events. Each time we receive more data, that event will fire, passing along the newest bit
NOTE: You’ll notice that we call data.tostring ( ) to log our chat messages. If we don’t do that, we would see the raw bytes in hex. For efficiency, Node.js often defaults to a data type called Bu I ter. We won’t go into detail here, but it’s easy enough to convert buffers into strings using buffer.tostring ( ) .
Here’s what the full file looks like with our addition:
const http - require('http')
const readline - require('readline')
const querystring - require('querystring')
const rl - readline.createInterface({ input: process.stdin })
rl.on('line', line =› {
http.get(
’http://localhost:133T/chat?${querystring.stringify( { message: line })}
http.get( 'http ://localhost : 1337/sse' , res =› (
res.on( 'data', data =› console. log(data.tostring()))
If we run it with node 16 -ee – readline -chat – receive . js, we will be able to see any messages we type into the browser:
This works, but we can do better. Each message is prefixed with data: and is followed by two newlines. This is expected because it’s the data format of the Server-sent events specification”.
We’re going to create a simple interface where we can pull messages from any SSE endpoint, and we’re going to use the power of event emitters to make this really easy.
Instead of just making an HTTP request and logging what we receive, we’ll create our own general- purpose event emitter that will give us custom “message” events without the extra stuff.
To do this we’re going to create a function that returns a new event emitter. However, before this function returns the event emitter, it will also set up the HTTP connection to receive chat messages. When those messages come in, it will use the new event emitter to emit them.
This is a useful pattern because it allows us to synchronously create an object that will act asynchronously. In some ways, this is similar to how a promise works.
Here's our new createEventSource( ) function:
function createEventSource (url) (
const source - new EventEmitter()
http.get(url, res => (
res.on('data', data => {
const message = data
.toString()
.replace(/’data: /, ' ')
.replace(/\ n$/, ")
source.emit('message', message)
return source
And here’s how we can use it:
const source - createEventSource('http://localhost:i337/sse')
source.on('message', console.log)
Because we’re cleaning up the data before we pass it to our new event emitter, when we log messages they are trimmed down to just the parts we care about:
This is great, but one of the biggest advantages of event emitters is that they can emit lots of different types of events. Instead of just emitting a “message” event, we can go further and emit different events depending on the content of the message.
There’s no limit to the number or types of events that we can emit. For example, we can look at a chat message to see if it ends with a question mark. If it does, we can can emit a “question” event, and if not, we can emit a “statement” event. Furthermore, these can be in addition to our exiting “message” event.
This is nice because it gives choice to the consumer of our event emitter. The consumer can choose which events they would like to subscribe to.
By adding a small amount of logic to determine the event type of our additional em it( ) call:
function createEventSource (url) (
const source - new EventEmitter()
http.get(url, res => {
res.on( ' data ' , data =› (
const message - data
.toString()
.replace(/’data: /, ' ')
.replace(/\n\n$/, ' ')
source.emiI( 'message' , message)
const eventType = message match(/\?$/) ? 'question''statement'
source.emit(eventType, message)
return source
We can choose to only listen for questions, and change our language to suit:
source.on('question', q =› console. log( Someone asked, ”${q}”’))
Event Emitters: Wrapping Up
The big advantage of event emitters is that they allow us to handle async behavior when the future action is either uncertain or repetitive. Unlike callbacks and promises (and therefore async/await), event emitters are not as useful for “one and done”-type behaviors.
In addition, by encapsulating filtering behavior into the event emitter itself, we can make very clean interfaces for consumers.
Next up we’re going to learn about one more common form of async behavior in Node.js which is actually a specific type of event emitter: streams.
Streams
Streams are ubiquitous within Node.js. In fact, almost allNode.js applications, no matter how simple, use streams in some manner[^From https://nodejs.org/docs/latest-v11.x/api/stream.html#stream – api for stream consumers]. Our apps are no exception, we’ve already used them a number of times.
Streams are a specific type of event emitter. This means that they have the same methods available like emit( ) and on( ). However, what makes streams so useful is that they follow conventions about how their events are emitted.
This is easiest to see with an example. Let’s take a look at stream object that we just encountered, the http response object.
http . get( url , res =› (
res.on('data', data => {
//here's where we use the data
In the snippet above, res is a stream, and because streams are event emitters, we can use on( ) to listen to its events and receive updates.
What’s cool about streams is that they standardize on the types of events that they emit. This makes them very predictable to use, and allows us to chain them together in interesting ways.
When we created our event emitter, we arbitrarily decided that we would emit a “message”, “question”, and “statement” event types. This means that any consumer of our event emitter would have to look at the code or at documentation to know to subscribe to those event types.
On the other hand, when reading from a stream, we can always expect “data”, “error”, and “end” events. Just by knowing that res is a stream, we know that we can get its data with the “data” event.
For example, here’s what it looks like to download a file, using a https response stream’s events:
const fs - require('fs')
const https = require('https')
const fileUrl =
'https://www.fullstackreact.com/assets/images/fullstack-react-hero-book.png'
https.get(fileUrl, res =› {
const chunks = []
res.on('data', data =› chunks.push(data)).on('end', () =›
fs.writeFile( 'book.png ', Buffer.concat(chunks), err =› (
if (err) console.error(err)
console. log( ' file saved ! ')
NOTE: By default, Node.js uses Buffer objects to store and transmit data because it’s more efficient. We’ll go over BuIter more later, but for now just know that (1) Buffer . concat( ) can convert an array of Buffer objects into a single Bu I her, and (2) fs.writeFile( ) is happy to accept a Buffer as its argument for what should be written to a file.
The way we do this makes sense when we think about the response stream as an event emitter. We don’t know ahead of time how many times the “data” event will be emitted. If this file is small, it could happen only once. Alternatively, if the file is really large, it could happen many times. If there’s an error connecting, it may not happen at all.
Our approach here is to collect all of the data “chunks” in an array, and only once we receive the “end” event to signal that no more are coming, do we proceed to write the data to a file.
This is a very common pattern for performing a batched write. As each chunk of data is received, we store it in memory, and once we have all of them, we write them to disk.
The downside of batching is that we need to be able to hold all of the data in memory. This is not a problem for smaller files, but we can run into trouble when working with large files — especially if the file is larger than our available memory. Often, it’s more efficient to write data us we receive it .
In addition to readable streams, there are also writable streams. For example, here’s how we can create a new file (time .Iog), and append to it over time:
const writestream fs.createWriteStream('time. log')
setInterval(() =› writestream write(’The time is now: ${new Date()}\n’), 4BBB)
Writable streams have write( ) and end( ) methods. In fact, we’ve already seen these in chapter 1. The HTTP response object is a writable stream. For most of our endpoints we send data back to the browser using res . end( ). However, when we want to keep the connection open for SSE, we used res . write( ) so that we did not close the connection.
Let’s change our previous example to use a writable stream to avoid buffering the image data in memory before writing it to disk:
const fs = require('fs')
const https - require('https')
const fileUrl
'https://www.fullstackreact.com/assets/images/fullstack-react-hero-book.png'
https.get(fileUrl, res =› {
const filestream - fs.createWriteStream('book.png')
res.on('data', data =› filestream. write(data)).on('end ', () =›
fileStream.end()
console.log('file saved!')
})
As we can see, we were able to eliminate the chunks array that we used to store all the file data in memory. Instead, we write each chunk of data to disk as it comes in.
The beauty of streams is that because all of these methods and events are standardized, we actually don’t need to listen to these events or call the methods manually.
Streams have an incredibly useful pipe( ) method that takes care of all of this for us. Let’s do the same thing, but instead of setting up our own handlers, we’ll use pipe( ) :
const fs = require('fs')
const https - require('https')
const fileUrl -
'https://www.fullstackreact.com/assets/images/fullstack-react-hero-book.png'
https.get(fileUrl, res =› {
res
.pipe(fs.createWriteStream( ' book.png' ))
.on(' finish', () =› console. log(' file saved!'))
Streams provide us a very efficient way of transferring data, and using pipe( ) we can do this very succinctly and easily.
Composing Streams
So far we’ve seen readable streams and writable streams, and we’ve learned that we can connect readable streams to writable streams via pipe( ) .
This is useful for moving data from one place to another, but often times we’ll want to transform the data in some way. We can do this using transform streams.
A transform stream behaves as both a readable stream and a writable stream. Therefore, we can pipe a read stream to a transform stream, and then we can pipe the transform stream to the write stream.
For example, if we wanted to efficiently transform the text in a file to upper case, we could do this:
const fs = require( ' fs')
const Transform } = require('stream')
fs.createReadStream('23-streams-shout js')
.pipe(shout())
.pipe(fs.createWriteStream('loud-code txt'))
function shout () {
return new Transform({
transform (chunk, encoding, callback) (
callback(null, chunk.toString().toUpperCase())
})
In this case we’ve created a function shout( ) that creates a new transform stream. This trans- form stream is both readable and writable. This means that we can pipe our read stream that we get from fs.createReadStream( ) to it, and we can also pipe it to our write stream from fs.createWritestream( ) .
Our transform stream is created by a simple function that expects three arguments. The first is the chunk of data, and we’re already familiar with this from our use of on( ‘ data ‘ ). The second is encoding, which is useful if the chunk is a string. However, because we have not changed any default behaviors with or read stream, we expect this value to be “buffer” and we can ignore it. The final argument is a callback to be called with the results of transforming the chunk. The value provided to the callback will be emitted as data to anything that is reading from the transform stream. The callback can also be used to emit an error. You can read more about transform streams in the official Node.js documentation“.
In this particular case we are performing a synchronous transformation. However, because we are given a callback, we are also able to do asynchronous transformations. This is useful if you need to look up information from disk or from a remote network service like an HTTP API.
If we run this script, and we open the resulting file, we’ll see that all the text has been transformed to upper case.
Of course, in real life we don’t often need to perform streaming case changes, but this shows how we can create general-purpose modular transform streams that can be used with a wide range of data.
In the next section we’ll take a look at a transform stream that is useful in the real world.
Real World Transform Streams
A common use-case in the real world is converting one source of data to another format. When the source data is large, it’s useful to use streams to perform the transformation.
Let’s look at an example where we have a csv file, and we’d like to change a number of things:
- “name” should be replaced with two separate “firstName” and “lastName” fields 2) the “dob” should be converted to an “age” integer field 3) the output should be newline delimited JSON instead of csv
Here’s some data from our example people . csv file:
name,dob
- Liam Jones,1988-06-26
- Maximus Rau,1989-08-21
- Lily Ernser,19T0-01-18
- Alvis O’Keefe,1961-01-19
Here’s what we’d like people . ndjson to look like when we’re finished:
( "firstName" "Liam" , "lastName" "Jones" , "age" : 30}
( "firstName" "Maximus" , "lastName" "Rau ” , "age" : 29)
( "firstName" "Lily" , "lastName" "Ernser" , "age" : 49)
( "firstName" "Alvis", "lastName" "0 ' Kee be" , "age" : 58)
( "firstName" "Amy" , "lastName" "Johnson" , "age" : 59}
Just like before, the first thing that we need to do is to use fs . createReadStream( ) to create a readable stream object for our peop1e . csv file:
fs createReadStream('people csv')
Next, we want to pipe this stream to a transform stream that can parse csv data. We could create our own, but there’s no need. We’re going to use an excellent and appropriately named module csv – parser that is available on npm. However, before we can use this in our code, we need to run n pm install csv – parser from our code directory.
Once that is installed we can use it like so:
const fs = require( ' fs ' )
const csv = require( ' csv - parser ' )
fs.createReadStream('people.csv')
.pipe(csv())
.on('data', row => console. log(JSON.stringify(row)))
When we run this, we’ll see that when we pipe to the transform stream created with csv( ) will emit data events, and each logged event will be an object representing the parsed csv row:
( "name" : "Liann Jones" , "dob" : "t988-06-26" )
( "name" : "Maximus Rau" , "dob" : "t989 -08-24 " )
( "name" : "LiIy Ernser" , "dob" : "t970-01-t8" )
( "name" : "AIvis Kee fe" , "dob" : "t96t -0t -t9" )
( "name" : "Amy Johnson" , "dob" : "t960-03-04" )
By using console.log( ) on each JSON stringified row object, our output format is newline delimited JSON, so we’re almost finished already. The only thing left to do is to add another transform stream into the mix to convert the objects before they are logged as JSON.
This will work the same way as our previous transform stream example, but with one difference. Streams are designed to work on String and Buffer types by default. In this case, our csv – parser stream is not emitting those types; it is emitting objects.
If we were to create a default transform stream with clean( ) :
const fs - require('fs')
const csv = require('csv-parser')
const (Transform) - require('stream')
fs.createReadStream('people.csv')
.pipe(csv())
.pipe(clean())
.on('data', row :› console. log(JSON.stringify(row)))
function clean () {
return new Transform({
transform (row, encoding, callback) (
callback ( null, row)
})
We would get the following error:
0 node 24-transform-csv-error.js
events.js:1T4
throw er; // Unhandled 'error' event
- ^
- .
TypeError [ERR_INVALID_ARO_TYPE]: The "chunk" argument must be one of type string or\
Buffer. Received type object
- .
at validChunk (_stream_writable.js:263:10)
- . at Transform.Writable.write ( stream_writable.js:29T:21)
Instead, we need to make sure that the objectMode option is set to true:
return new Transform((
objectMode: true,
transform (row, encoding, callback) { }
With that issue out of the way, we can create our trans form( ) function. The two things we need to do are to convert the single “name” field into separate “firstName” and “lastName” fields, and to change the “dob” field to an “age” field. Both are easy with some simple string manipulation and date math:
transform (row, encoding, callback) (
const [firstName, lastName] = row.name.split(' ')
const age - Math.floor((Date.now() new Date(row.dob)) / YEAR_MS)
callback(null, {
firstName,
lastName,
age
Now, when our transform stream emits events, they will be properly formatted and can be logged as JSON:
const fs - require('fs')
const csv - require('csv-parser')
const ( Transform } - require('stream')
const YEAR_MS = 365 * 24 * 60 * 60 * 1000
fs.createReadStream('people.csv')
.pipe(csv())
.pipe(clean())
.on('data', row -> console.log(JSON.stringify(row)))
function clean () (
return new Transform({
objectMode: true,
transform (row, encoding, callback)
const [firstName, lastName] - row.name.split(' ')
const age = Math.floor((Date.now() new Date(row.dob)) / YEAR_MS)
callback(null, {
firstName,
lastName,
age
Now when we run this with node 24 – trans form -csv.js people . ndjson our csv rows will be transformed and the newline delimited JSON will be written to people . ndjson:
Our data in our csv file is transformed and converted to ndjson
Steams:Wrapping Up
In this section we’ve seen how to use streams to efficiently transform large amounts of data. In the real world, we’ll typically receive some large file or have to deal with a large data source, and it can be infeasible to process it all at once. Sometimes we need to change the format (e.g. from csv to ndjson), and other times we need to clean or modify the data. In either case, transform streams are a great tool to have at our disposal.
Async Final Words
In this chapter we’ve explored many of the different async patterns available in Node.js. Not all of them are appropriate in all circumstances, but all of them are important to be familiar with.
As we begin to build more functionality into our applications and services, it’s important to know how to be efficient and what tools we have available.
While callbacks and promises serve a similar purpose, it’s important to be familiar with both so that we can use Node.js core APIs as well as third-party modules. Similarly, using async/await can make certain types of operations cleaner to write, but using async functions can impose other restrictions on our code, and it’s great to have alternatives when it’s not worth the trade-off.
Additionally, we’ve learned about event emitters and streams, two higher-level abstractions that allow us to work with multiple future actions and deal with data in a more efficient, continuous way.
All of these techniques will serve us well as we continue to build with Node.js.