node.js的http.createServer过程深入解析
- 作者: 嗫?暁雲?
- 来源: 51数据库
- 2021-07-09
下面是nodejs创建一个服务器的代码。接下来我们一起分析这个过程。
var http = require('http');
http.createserver(function (request, response) {
response.end('hello world
');
}).listen(9297);
首先我们去到lib/http.js模块看一下这个函数的代码。
function createserver(requestlistener) {
return new server(requestlistener);
}
只是对_http_server.js做了些封装。我们继续往下看。
function server(requestlistener) {
if (!(this instanceof server)) return new server(requestlistener);
net.server.call(this, { allowhalfopen: true });
// 收到http请求时执行的回调
if (requestlistener) {
this.on('request', requestlistener);
}
this.httpallowhalfopen = false;
// 建立tcp连接的回调
this.on('connection', connectionlistener);
this.timeout = 2 * 60 * 1000;
this.keepalivetimeout = 5000;
this._pendingresponsedata = 0;
this.maxheaderscount = null;
}
util.inherits(server, net.server);
发现_http_server.js也没有太多逻辑,继续看lib/net.js下的代码。
function server(options, connectionlistener) {
if (!(this instanceof server))
return new server(options, connectionlistener);
eventemitter.call(this);
// connectionlistener在http.js处理过了
if (typeof options === 'function') {
connectionlistener = options;
options = {};
this.on('connection', connectionlistener);
} else if (options == null || typeof options === 'object') {
options = options || {};
if (typeof connectionlistener === 'function') {
this.on('connection', connectionlistener);
}
} else {
throw new errors.typeerror('err_invalid_arg_type',
'options',
'object',
options);
}
this._connections = 0;
......
this[async_id_symbol] = -1;
this._handle = null;
this._usingworkers = false;
this._workers = [];
this._unref = false;
this.allowhalfopen = options.allowhalfopen || false;
this.pauseonconnect = !!options.pauseonconnect;
}
至此http.createserver就执行结束了,我们发现这个过程还没有涉及到很多逻辑,并且还是停留到js层面。接下来我们继续分析listen函数的过程。该函数是net模块提供的。我们只看关键的代码。
server.prototype.listen = function(...args) {
// 处理入参,根据文档我们知道listen可以接收好几个参数,我们这里是只传了端口号9297
var normalized = normalizeargs(args);
// normalized = [{port: 9297}, null];
var options = normalized[0];
var cb = normalized[1];
// 第一次listen的时候会创建,如果非空说明已经listen过
if (this._handle) {
throw new errors.error('err_server_already_listen');
}
......
listenincluster(this, null, options.port | 0, 4,
backlog, undefined, options.exclusive);
}
function listenincluster() {
...
server._listen2(address, port, addresstype, backlog, fd);
}
_listen2 = setuplistenhandle = function() {
......
this._handle = createserverhandle(...);
this._handle.listen(backlog || 511);
}
function createserverhandle() {
handle = new tcp(tcpconstants.server);
handle.bind(address, port);
}
到这我们终于看到了tcp连接的内容,每一个服务器新建一个handle并且保存他,该handle是一个tcp对象。然后执行bind和listen函数。接下来我们就看一下tcp类的代码。tcp是c++提供的类。对应的文件是tcp_wrap.cc。我们看看new tcp的时候发生了什么。
void tcpwrap::new(const functioncallbackinfo<value>& args) {
// this constructor should not be exposed to public javascript.
// therefore we assert that we are not trying to call this as a
// normal function.
check(args.isconstructcall());
check(args[0]->isint32());
environment* env = environment::getcurrent(args);
int type_value = args[0].as<int32>()->value();
tcpwrap::sockettype type = static_cast<tcpwrap::sockettype>(type_value);
providertype provider;
switch (type) {
case socket:
provider = provider_tcpwrap;
break;
case server:
provider = provider_tcpserverwrap;
break;
default:
unreachable();
}
new tcpwrap(env, args.this(), provider);
}
tcpwrap::tcpwrap(environment* env, local<object> object, providertype provider)
: connectionwrap(env, object, provider) {
int r = uv_tcp_init(env->event_loop(), &handle_);
check_eq(r, 0);
}
我们看到,new tcp的时候其实是执行libuv的uv_tcp_init函数,初始化一个uv_tcp_t的结构体。首先我们先看一下uv_tcp_t结构体的结构。

uv_tcp_t
uv_tcp_t
// 初始化一个tcp流的结构体
int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) {
// 未指定未指定协议
return uv_tcp_init_ex(loop, tcp, af_unspec);
}
int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* tcp, unsigned int flags) {
int domain;
/* use the lower 8 bits for the domain */
// 低八位是domain
domain = flags & 0xff;
if (domain != af_inet && domain != af_inet6 && domain != af_unspec)
return uv_einval;
// 除了第八位的其他位是flags
if (flags & ~0xff)
return uv_einval;
uv__stream_init(loop, (uv_stream_t*)tcp, uv_tcp);
/* if anything fails beyond this point we need to remove the handle from
* the handle queue, since it was added by uv__handle_init in uv_stream_init.
*/
if (domain != af_unspec) {
int err = maybe_new_socket(tcp, domain, 0);
if (err) {
// 出错则把该handle移除loop队列
queue_remove(&tcp->handle_queue);
return err;
}
}
return 0;
}
我们接着看uv__stream_init做了什么事情。
void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type) {
int err;
uv__handle_init(loop, (uv_handle_t*)stream, type);
stream->read_cb = null;
stream->alloc_cb = null;
stream->close_cb = null;
stream->connection_cb = null;
stream->connect_req = null;
stream->shutdown_req = null;
stream->accepted_fd = -1;
stream->queued_fds = null;
stream->delayed_error = 0;
queue_init(&stream->write_queue);
queue_init(&stream->write_completed_queue);
stream->write_queue_size = 0;
if (loop->emfile_fd == -1) {
err = uv__open_cloexec("/dev/null", o_rdonly);
if (err < 0)
/* in the rare case that "/dev/null" isn't mounted open "/"
* instead.
*/
err = uv__open_cloexec("/", o_rdonly);
if (err >= 0)
loop->emfile_fd = err;
}
#if defined(__apple__)
stream->select = null;
#endif /* defined(__apple_) */
// 初始化io观察者
uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}
void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) {
assert(cb != null);
assert(fd >= -1);
// 初始化队列,回调,需要监听的fd
queue_init(&w->pending_queue);
queue_init(&w->watcher_queue);
w->cb = cb;
w->fd = fd;
w->events = 0;
w->pevents = 0;
#if defined(uv_have_kqueue)
w->rcount = 0;
w->wcount = 0;
#endif /* defined(uv_have_kqueue) */
}
从代码可以知道,只是对uv_tcp_t结构体做了一些初始化操作。到这,new tcp的逻辑就执行完毕了。接下来就是继续分类nodejs里调用bind和listen的逻辑。nodejs的bind对应libuv的函数是uv__tcp_bind,listen对应的是uv_tcp_listen。
先看一个bind的核心代码。
/* cannot set ipv6-only mode on non-ipv6 socket. */
if ((flags & uv_tcp_ipv6only) && addr->sa_family != af_inet6)
return uv_einval;
// 获取一个socket并且设置某些标记
err = maybe_new_socket(tcp, addr->sa_family, 0);
if (err)
return err;
on = 1;
// 设置在端口可重用
if (setsockopt(tcp->io_watcher.fd, sol_socket, so_reuseaddr, &on, sizeof(on)))
return uv__err(errno);
bind(tcp->io_watcher.fd, addr, addrlen) && errno != eaddrinuse
static int maybe_new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
struct sockaddr_storage saddr;
socklen_t slen;
if (domain == af_unspec) {
handle->flags |= flags;
return 0;
}
return new_socket(handle, domain, flags);
}
static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
struct sockaddr_storage saddr;
socklen_t slen;
int sockfd;
int err;
// 获取一个socket
err = uv__socket(domain, sock_stream, 0);
if (err < 0)
return err;
sockfd = err;
// 设置选项和保存socket的文件描述符到io观察者中
err = uv__stream_open((uv_stream_t*) handle, sockfd, flags);
if (err) {
uv__close(sockfd);
return err;
}
...
return 0;
}
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
return uv_ebusy;
assert(fd >= 0);
stream->flags |= flags;
if (stream->type == uv_tcp) {
if ((stream->flags & uv_handle_tcp_nodelay) && uv__tcp_nodelay(fd, 1))
return uv__err(errno);
/* todo use delay the user passed in. */
if ((stream->flags & uv_handle_tcp_keepalive) &&
uv__tcp_keepalive(fd, 1, 60)) {
return uv__err(errno);
}
}
...
// 保存socket对应的文件描述符到io观察者中,libuv会在io poll阶段监听该文件描述符
stream->io_watcher.fd = fd;
return 0;
}
上面的一系列操作主要是新建一个socket文件描述符,设置一些flag,然后把文件描述符保存到io观察者中,libuv在poll io阶段会监听该文件描述符,如果有事件到来,会执行设置的回调函数,该函数是在uv__stream_init里设置的uv__stream_io。最后执行bind函数进行绑定操作。最后我们来分析一下listen函数。首先看下tcp_wrapper.cc的代码。
void tcpwrap::listen(const functioncallbackinfo<value>& args) {
tcpwrap* wrap;
assign_or_return_unwrap(&wrap,
args.holder(),
args.getreturnvalue().set(uv_ebadf));
int backlog = args[0]->int32value();
int err = uv_listen(reinterpret_cast<uv_stream_t*>(&wrap->handle_),
backlog,
onconnection);
args.getreturnvalue().set(err);
}
代码中有个很重要的地方就是onconnection函数,nodejs给listen函数设置了一个回调函数onconnection,该函数在io观察者里保存的文件描述符有连接到来时会被调用。onconnection函数是在connection_wrap.cc定义的,tcp_wrapper继承了connection_wrap。下面我们先看一下uv_listen。该函数调用了uv_tcp_listen。该函数的核心代码如下。
if (listen(tcp->io_watcher.fd, backlog)) return uv__err(errno); // cb即onconnection tcp->connection_cb = cb; tcp->flags |= uv_handle_bound; // 有连接到来时的libuv层回调,覆盖了uv_stream_init时设置的值 tcp->io_watcher.cb = uv__server_io; // 注册事件 uv__io_start(tcp->loop, &tcp->io_watcher, pollin);
在libuv的poll io阶段,epoll_wait会监听到到来的连接,然后调用uv__server_io。下面是该函数的核心代码。
// 继续注册事件,等待连接 uv__io_start(stream->loop, &stream->io_watcher, pollin); err = uv__accept(uv__stream_fd(stream)); // 保存连接对应的socket stream->accepted_fd = err; // 执行nodejs层回调 stream->connection_cb(stream, 0);
libuv会摘下一个连接,得到对应的socket。然后执行nodejs层的回调,这时候我们来看一下onconnection的代码。
onconnection(uv_stream_t* handle,int status)
if (status == 0) {
// 新建一个uv_tcp_t结构体
local<object> client_obj = wraptype::instantiate(env, wrap_data, wraptype::socket);
wraptype* wrap;
assign_or_return_unwrap(&wrap, client_obj);
uv_stream_t* client_handle = reinterpret_cast<uv_stream_t*>(&wrap->handle_);
// uv_accept返回0表示成功
if (uv_accept(handle, client_handle))
return;
argv[1] = client_obj;
}
// 执行上层的回调,该回调是net.js设置的onconnection
wrap_data->makecallback(env->onconnection_string(), arraysize(argv), argv);
onconnection新建了一个uv_tcp_t结构体。代表这个连接。然后调用uv_accept。
int uv_accept(uv_stream_t* server, uv_stream_t* client) {
...
// 新建的uv_tcp_t结构体关联accept_fd,注册读写事件
uv__stream_open(client, server->accepted_fd, uv_handle_readable | uv_handle_writable);
...
}
最后执行nodejs的回调。
function onconnection(err, clienthandle) {
var handle = this;
var self = handle.owner;
if (err) {
self.emit('error', errnoexception(err, 'accept'));
return;
}
if (self.maxconnections && self._connections >= self.maxconnections) {
clienthandle.close();
return;
}
var socket = new socket({
handle: clienthandle,
allowhalfopen: self.allowhalfopen,
pauseoncreate: self.pauseonconnect
});
socket.readable = socket.writable = true;
self._connections++;
socket.server = self;
socket._server = self;
dtrace_net_server_connection(socket);
lttng_net_server_connection(socket);
counter_net_server_connection(socket);
// 触发_http_server.js里设置的connectionlistener回调
self.emit('connection', socket);
}
listen函数总体的逻辑就是把socket设置为可监听,然后注册事件,等待连接的到来,连接到来的时候,调用accept获取新建立的连接,tcp_wrapper.cc的回调新建一个uv_tcp_t结构体,代表新的连接,然后设置可读写事件,并且设置回调为uv__stream_io,等待数据的到来。最后执行_http_server.js设置的回调connectionlistener。至此,服务器启动并且接收连接的过程就完成了。接下来就是对用户数据的读写。当用户传来数据时,处理数据的函数是uv__stream_io。后面继续解析数据的读写。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。
