top of page

MongoDB ChangeStreams - Resuming

  • Writer: Rishaab
    Rishaab
  • Jun 10, 2021
  • 4 min read

Updated: Jul 16


Today we are going to explore a very powerful tool in the change streams called the resume token.


For motivation purposes, consider, your application uses the change streams to receive the events. The application receives the events until the time t1, and suddenly restarts (because of some crash) and opens change streams again from the time t2. Your application starts receiving events that occurred after the time t2 and everything is hunky-dory. But wait! What about those events that occurred between the time t1 and t2? Looks like we missed those. How to receive those events? And, that's where the resume token comes into play.


A resume token is essentially the _id field in a change event. We can specify a particular event's _id in the watch() command to tell the change stream that we are interested in receiving the events after that event.


A resume token can be specified in the following ways,


Syntax with resumeAfter

var cursor = db.test.watch({resumeAfter: <event's _id>})

Syntax with startAfter

var cursor = db.test.watch({startAfter: <event's _id>})



The resumeAfter

The resumeAfter is used to resume a change streams on a collection or a database after the specified event. Let's see how to work with this construct.


First, start watching the test collection and then add a few documents to the collection.

let cursor = db.test.watch()
db.test.insert({sample:"test1"})
db.test.insert({sample:"test2"})

Next, do cursor.next() just one time and it should return the first inserted event.

{
	"_id" : {
		"_data" : "8260C141F8000000012B0..."
	},
	"operationType" : "insert",
	"clusterTime" : Timestamp(1623278072, 1),
	"fullDocument" : {
		"_id" : ObjectId("60c141f8d13abda125b2c950"),
		"sample" : "test1"
	},
	"ns" : {
		"db" : "test",
		"coll" : "test"
	},
	"documentKey" : {
		"_id" : ObjectId("60c141f8d13abda125b2c950")
	}
}

A second cursor.next() will return the second event, but I will skip that part. What we are interested is in resuming the change streams after the first event.


To do that, we will use the _id field from the first event and that is the resume token, ie.

{
    "_data" : "8260C141F8000000012B0..."
}

Assume that we have stored this resume token in a variable name resumeToken, we can now open another change streams and pass this resume token to the resumeAfter option.

var resumeCursor = db.test.watch([], {resumeAfter: resumeToken})

We can see that the above command automatically returned the second event, ie. the second inserted document. It did not return the first inserted document.

{ 
    "_id" : {
        "_data" : "8260C141FB000000012B022C0100296E5A1004A218803030714C459C992DB04FF9FCC946645F6964006460C141FBD13ABDA125B2C9510004" 
    }, 
    "operationType" : "insert",
    "clusterTime" : Timestamp(1623278075, 1),
    "fullDocument" : {
        "_id" : ObjectId("60c141fbd13abda125b2c951"),
        "sample" : "test2"
    },
    "ns" : {
        "db" : "test",
        "coll" : "test"
    },
    "documentKey" : {
        "_id" : ObjectId("60c141fbd13abda125b2c951") 
    }
}

This means that the change streams were resumed after the first event, as expected. We can now call next() on the resumeCursor to receive further events.



Invalidate Event

Before we delve into the startAfter option, we need to first understand what an invalidate event is. An invalidate event is emitted by the change streams in any one of the following cases

  • a collection is dropped

  • a collection is renamed

  • a database is dropped


An invalidate event causes the change streams cursor to get closed, ie. if we do hasNext() on the cursor, it will return false. And calling cursor.next() after this will not return any event. By this point, our cursor should be closed and we can check that by calling cursor.isClosed() and it should return true. To understand more, let's see this in action.


Consider our previous test collection example. A watch on the test collection followed by an insert.

let cursor = db.test.watch()
db.test.insert({sample:"test"})

And cursor.next() gave us an inserted document,

{
	"_id" : {
		"_data" : "8260C13C93000000012B022..."
	},
	"operationType" : "insert",
	"clusterTime" : Timestamp(1623276691, 1),
	"fullDocument" : {
		"_id" : ObjectId("60c13c9306722130ea728fa2"),
		"sample" : "test"
	},
	"ns" : {
		"db" : "test",
		"coll" : "test"
	},
	"documentKey" : {
		"_id" : ObjectId("60c13c9306722130ea728fa2")
	}
}

Let's drop the test collection

db.test.drop()

Now, if we do a next on the cursor, we will see a drop collection event.

{
	"_id" : {
		"_data" : "8260C13D20000000012..."
	},
	"operationType" : "drop",
	"clusterTime" : Timestamp(1623276832, 1),
	"ns" : {
		"db" : "test",
		"coll" : "test"
	}
}

One more cursor.next() and we see an invalidate event

{
	"_id" : {
		"_data" : "8260C13D20000000012B..."
	},
	"operationType" : "invalidate",
	"clusterTime" : Timestamp(1623276832, 1)
}

At this point, our cursor and hence the change streams is closed. We can check that by calling isClosed() method on the cursor.

cursor.isClosed()
true


The startAfter


The resumeAfter option cannot be used on a resume token of the invalidate event as it closes the cursor.

Consider, you are watching a collection named test, you renamed this collection to newTest while the change streams on test collection is still open. This rename operation will generate an invalidate event. If the change streams get passed the invalidate event, the cursor on the test collection will be closed and we will not be able to receive any more events on the test collection. To continue receiving the events even after the collection renames, we should use startAfter option. Let's see this with an example.


Keep a watch on the collection and add a document to the collection.

let cursor = db.test.watch()
db.test.insert({sample:"test"})

Rename the collection and add a new document to the original collection test

db.test.renameCollection("newTest")
db.test.insert({sample: "testAfterRename"})

Advance the cursor by doing cursor.next() until we get the invalidate event and store the resume token of the invalidate event in a variable named resumeToken.


Open another change streams with the above resume token using the startAfter option.

cursor = db.test.watch([], {startAfter: resumeToken})

And now we should see that the change streams get passed the invalidate token and return the new document we inserted after the collection rename.

{ 
    "_id" : { 
        "_data" : "8260C21A45000000032B022C0100296E5A1004CE97256B4B694691BC6FB86E4ABEB20046645F6964006460C21A45C3B3A12998978E7B0004" 
    }, 
    "operationType" : "insert",
    "clusterTime" : Timestamp(1623333445, 3), 
    "fullDocument" : { 
        "_id" : ObjectId("60c21a45c3b3a12998978e7b"), 
        "sample" : "testAfterRename"
    },
    "ns" : { 
        "db" : "test", 
        "coll" : "test" 
    },
    "documentKey" : { 
        "_id" : ObjectId("60c21a45c3b3a12998978e7b") 
    } 
}

We can do cursor.next() to continue receiving the events.



Recent Posts

See All
BSON - Diving Deep

BSON stands for Binary JSON which is a serialization format for binary-encoding JSON-like documents. BSON was developed at MongoDB in...

 
 
 

Comments


Thanks for submitting!

©2023 by Rishab Joshi.

bottom of page