使用 Node 处理长时间运行的查询

由于 Node 是单线程的,因此如果涉及长时间运行的计算,则需要解决方法。

注意: 这是准备运行的示例。只是,不要忘记获取 jQuery 并安装所需的模块。

这个例子的主要逻辑:

  1. 客户端向服务器发送请求。
  2. 服务器在单独的节点实例中启动例程,并使用相关的任务 ID 发送立即响应。
  3. 客户端不断向服务器发送检查以获取给定任务 ID 的状态更新。

项目结构:

    project
    │   package.json
    │   index.html    
    │
    ├───js
    │      main.js
    │      jquery-1.12.0.min.js
    │   
    └───srv
        │    app.js
        ├─── models
        │      task.js
        └─── tasks
               data-processor.js

app.js:

var express     = require('express');
var app         = express();
var http        = require('http').Server(app);
var mongoose    = require('mongoose');
var bodyParser  = require('body-parser');

var childProcess= require('child_process');

var Task        = require('./models/task');

app.use(bodyParser.urlencoded({ extended: true }));
app.use(bodyParser.json());

app.use(express.static(__dirname + '/../'));

app.get('/', function(request, response){
    response.render('index.html');
});

//route for the request itself
app.post('/long-running-request', function(request, response){
    //create new task item for status tracking
    var t = new Task({ status: 'Starting ...' });
    
    t.save(function(err, task){
        //create new instance of node for running separate task in another thread
        taskProcessor = childProcess.fork('./srv/tasks/data-processor.js');

        //process the messages comming from the task processor
        taskProcessor.on('message', function(msg){
            task.status = msg.status;
            task.save();
        }.bind(this));

        //remove previously openned node instance when we finished
        taskProcessor.on('close', function(msg){
            this.kill();
        });

        //send some params to our separate task
        var params = {
            message: 'Hello from main thread'
        };

        taskProcessor.send(params);
        response.status(200).json(task);
    });
});

//route to check is the request is finished the calculations
app.post('/is-ready', function(request, response){
    Task
        .findById(request.body.id)
        .exec(function(err, task){
            response.status(200).json(task);
        });
});

mongoose.connect('mongodb://localhost/test');
http.listen('1234');

task.js:

var mongoose    = require('mongoose');

var taskSchema = mongoose.Schema({
    status: {
        type: String
    }
});

mongoose.model('Task', taskSchema);

module.exports  = mongoose.model('Task');

数据 processor.js:

process.on('message', function(msg){
    init = function(){
        processData(msg.message);
    }.bind(this)();
    
    function processData(message){
        //send status update to the main app
        process.send({ status: 'We have started processing your data.' });

        //long calculations ..
        setTimeout(function(){
            process.send({ status: 'Done!' });
            
            //notify node, that we are done with this task
            process.disconnect();
        }, 5000);
    }
});

process.on('uncaughtException',function(err){
    console.log("Error happened: " + err.message + "\n" + err.stack + ".\n");
    console.log("Gracefully finish the routine.");
});

index.html 的:

<!DOCTYPE html>
<html>
    <head>
        <script src="./js/jquery-1.12.0.min.js"></script>
        <script src="./js/main.js"></script>
    </head>
    <body>
        <p>Example of processing long-running node requests.</p>
        <button id="go" type="button">Run</button>
        
        <br />
        
        <p>Log:</p>
        <textarea id="log" rows="20" cols="50"></textarea>    
    </body>
</html>

main.js:

$(document).on('ready', function(){
    
    $('#go').on('click', function(e){
        //clear log
        $("#log").val('');
        
        $.post("/long-running-request", {some_params: 'params' })
            .done(function(task){
                $("#log").val( $("#log").val() + '\n' + task.status);
                
                //function for tracking the status of the task
                function updateStatus(){
                    $.post("/is-ready", {id: task._id })
                        .done(function(response){
                            $("#log").val( $("#log").val() + '\n' + response.status);
                            
                            if(response.status != 'Done!'){
                                checkTaskTimeout = setTimeout(updateStatus, 500);
                            }
                        });
                }
                
                //start checking the task
                var checkTaskTimeout = setTimeout(updateStatus, 100);
            });
    });
});

package.json:

{
  "name": "nodeProcessor",
  "dependencies": {
    "body-parser": "^1.15.2",
    "express": "^4.14.0",
    "html": "0.0.10",
    "mongoose": "^4.5.5"
  }
}

免责声明: 此示例旨在为你提供基本概念。要在生产环境中使用它,它需要改进。