Being a decentralized platform, we usually need a way to communicate between different nodes. There are multiple ways to do the same. We use blockchain events for transferring sensitive information between nodes. But for all the other information, we can use ipfs pub/sub which, is based on publisher-subscriber pattern often used to handle events in large-scale networks. Being a relatively new feature in IPFS, IPFS pubsub subscription reconnects are not officially supported. In this article, we will learn to handle the IPFS pubsub subscription disconnect in the production environment. The version of ipfs used is 0.4.16.
Let’s see how we can gracefully handle ipfs subscribe disconnects in Nodejs.
We are using the nodeJS HTTP package for making the HTTP calls as ipfs provides us an HTTP endpoint to listen to the subscribed topic. Calling an HTTP get request to provide us a response stream that can trigger multiple callbacks. The stream provides us with the following events.
We are storing each request in a subscription object by generating a random id as its key.
File subscriptions.js
In this file we will handle the ipfs subscriptions.
This file returns a subscribe method which can be used anywhere in the project to subscribe on ipfs topic.
const subscriptions = {};
module.exports.subscribe = async (topic, callback, id = null) => {
if (id == null) id = new Date().getTime() * 100 + Math.floor(Math.random() * 99);
const reqObj = http.get('http://localhost:9096/pubsub/sub?arg=${topic}&discover=false, async (res) => {
res.on("data", async (msg) => {
try {
const str = Buffer.from(msg, 'base64').toString('utf-8');
const strJSON = JSON.parse(str);
const data = strJSON.data;
callback(data);
} catch (err) {
console.log(err);
}
});
res.on("end", async () => {
if ("isCanceled" in subscriptions[id] && subscriptions[id].isCanceled == true) {
delete subscriptions[id];
return;
}
await cancel(id);
setTimeout(async () => {
try { await subscribe(topic, callback, id); } catch (_) { }
}, 10 * 1000);
});
res.on("error", async (err) => {
console.log("unable to add subscription on " + topic);
console.log(err.message);
setTimeout(async () => {
try { await subscribe(topic, callback, id); } catch (_) { }
}, 10 * 1000);
});
});
reqObj.topic = topic;
subscriptions[id] = reqObj;
console.log("subscription added on " + topic);
return id;
}
const cancel = module.exports.cancel = async (id) => {
if (id in subscriptions) {
if ("abort" in subscriptions[id]) try {
subscriptions[id].isCanceled = true;
subscriptions[id].abort();
} catch (_) { }
console.log("subscription removed on " + (subscriptions[id].topic || id));
}
}
The events we are more concerned about are Error and End.
Let us discuss the ‘error’ event first. This event is called whenever there is some error while making the HTTP call, and in this case, we only need to retry for the ipfs subscription HTTP call. In this case, we have added some delay using setTimeout and called the method recursively also passed the previous id so that the subscription object gets overwrite.
Let us discuss the ‘end’ event now. The end event is called when the request is terminated – there is no more data to be sent in the response. In this case, we also need to subscribe again on the same topic as we did in the ‘error’ event. But in this case, we also need to abort the previous stream explicitly. Suppose, if there is some data to be received in response, it should not come more than once.
Let us discuss the ‘data’ event now. The data received from ipfs is base64 encoded. After decrypting, we are parsing it as JSON as we follow a policy that we will always send JSON data in ipfs pubsub. However, the same is not mandatory. After this, the callback is called with the JSON object.
The subscription object can also be saved in a database and re-initialized upon a service restart. For our case, we’re simply storing the subscriptions in memory.