Can't Populate Big Chunk Of Data To Mongodb Using Node.js
Solution 1:
As Robbie said, streams are the way to go with this. fs.createReadStream()
should be used instead of .readFileSync()
. I'd start with creating a line reader that takes a path and whatever string/regex you want to split on:
linereader.js
var fs = require("fs");
var util = require("util");
var EventEmitter = require("events").EventEmitter;
function LineReader(path, splitOn) {
var readStream = fs.createReadStream(path);
var self = this;
var lineNum = 0;
var buff = ""
var chunk;
readStream.on("readable", function() {
while( (chunk = readStream.read(100)) !== null) {
buff += chunk.toString();
var lines = buff.split(splitOn);
for (var i = 0; i < lines.length - 1; i++) {
self.emit("line",lines[i]);
lineNum += 1;
}
buff = lines[lines.length - 1];
}
});
readStream.on("close", function() {
self.emit("line", buff);
self.emit("close")
});
readStream.on("error", function(err) {
self.emit("error", err);
})
}
util.inherits(LineReader, EventEmitter);
module.exports = LineReader;
This will read a text file, and emit "line" events for each line read, so you won't have all of them in memory at once. Then, using the async package (or whatever async loop you want to use), loop through the files inserting each document:
app.js
var LineReader = require("./linereader.js");
var async = require("async");
var paths = ["./text1.txt", "./text2.txt", "./path1/text3.txt"];
var reader;
async.eachSeries(paths, function(path, callback) {
reader = new LineReader(path, /\n/g);
reader.on("line", function(line) {
var doc = turnTextIntoObject(line);
db.collection("mycollection").insert(doc);
})
reader.on("close", callback);
reader.on("error", callback);
}, function(err) {
// handle error and finish;
})
Solution 2:
Try using streams instead of loading each file into memory.
I've sent you a pull request with an implementation using streams and async i/o.
This is most of it:
var Async = require('async');
var Csv = require('csv-streamify');
var Es = require('event-stream');
var Fs = require('fs');
var Mapping = require('./folder2siteRef.json');
var MongoClient = require('mongodb').MongoClient;
var sourcePath = '/hnet/incoming/' + new Date().getFullYear();
Async.auto({
db: function (callback) {
console.log('opening db connection');
MongoClient.connect('mongodb://localhost:27017/test3', callback);
},
subDirectory: function (callback) {
// read the list of subfolder, which are sites
Fs.readdir(sourcePath, callback);
},
loadData: ['db', 'subDirectory', function (callback, results) {
Async.each(results.subDirectory, load(results.db), callback);
}],
cleanUp: ['db', 'loadData', function (callback, results) {
console.log('closing db connection');
results.db.close(callback);
}]
}, function (err) {
console.log(err || 'Done');
});
var load = function (db) {
return function (directory, callback) {
var basePath = sourcePath + '/' + directory;
Async.waterfall([
function (callback) {
Fs.readdir(basePath, callback); // array of files in a directory
},
function (files, callback) {
console.log('loading ' + files.length + ' files from ' + directory);
Async.each(files, function (file, callback) {
Fs.createReadStream(basePath + '/' + file)
.pipe(Csv({objectMode: true, columns: true}))
.pipe(transform(directory))
.pipe(batch(200))
.pipe(insert(db).on('end', callback));
}, callback);
}
], callback);
};
};
var transform = function (directory) {
return Es.map(function (data, callback) {
data.siteRef = Mapping[directory];
data.epoch = parseInt((data.TheTime - 25569) * 86400) + 6 * 3600;
callback(null, data);
});
};
var insert = function (db) {
return Es.map(
function (data, callback) {
if (data.length) {
var bulk = db.collection('hnet').initializeUnorderedBulkOp();
data.forEach(function (doc) {
bulk.insert(doc);
});
bulk.execute(callback);
} else {
callback();
}
}
);
};
var batch = function (batchSize) {
batchSize = batchSize || 1000;
var batch = [];
return Es.through(
function write (data) {
batch.push(data);
if (batch.length === batchSize) {
this.emit('data', batch);
batch = [];
}
},
function end () {
if (batch.length) {
this.emit('data', batch);
batch = [];
}
this.emit('end');
}
);
};
I've updated your tomongo.js script using streams. I've also changed it to use async instead of sync for its file i/o.
I tested this against the structure defined in your code with small data sets and it worked really well. I did some limited testing against 3xdirs with 900xfiles and 288xlines. I'm not sure how big each row of your data is, so i threw a few random properties in. Its quite fast. See how it goes with your data. If it causes issues, you could try throttling it with different write concerns when executing the bulk insert operation.
Also check out some of these links for more information on streams in node.js:
http://nodestreams.com - a tool written by John Resig with many stream examples.
And event-stream a very useful streams module.
Post a Comment for "Can't Populate Big Chunk Of Data To Mongodb Using Node.js"