中国最全IT社区平台 联系我们 | 收藏本站
华晨云阿里云优惠2

nodejs教程

Node.js

阅读 (2277)

稳定性: 2 - 不稳定

流是一个抽象接口,在 Node 里被不同的对象实现。例如request to an HTTPserver 是流,stdout 是流。流是可读,可写,或者可读写。所有的流是 EventEmitter 的实例。

你可以通过 require('stream') 加载 Stream 基类。其中包括了 Readable 流、Writable 流、Duplex 流和 Transform 流的基类。

这个文档分为 3 个章节。第一个章节解释了在你的程序中使用流时候需要了解的部分。如果你不用实现流式 API,可以只看这个章节。

如果你想实现你自己的流,第二个章节解释了这部分 API。这些 API 让你的实现更加简单。

第三个部分深入的解释了流是如何工作的,包括一些内部机制和函数,这些内容不要改动,除非你明确知道你要做什么。

面向流消费者的 API

流可以是可读(Readable),可写(Writable),或者兼具两者(Duplex,双工)的。

所有的流都是事件分发器(EventEmitters),但是也有自己的方法和属性,这取决于他它们是可读(Readable),可写(Writable),或者兼具两者(Duplex,双工)的。

如果流式可读写的,则它实现了下面的所有方法和事件。因此,这个章节 API 完全阐述了DuplexTransform 流,即便他们的实现有所不同。

没有必要为了消费流而在你的程序里实现流的接口。如果你正在你的程序里实现流接口,请同时参考下面的API for Stream Implementors

基本所有的 Node 程序,无论多简单,都会使用到流。这有一个使用流的例子。

javascript
var http = require('http');

var server = http.createServer(function (req, res) {
  // req is an http.IncomingMessage, which is 可读流(Readable stream)
  // res is an http.ServerResponse, which is a Writable Stream

  var body = '';
  // we want to get the data as utf8 strings
  // If you don't set an encoding, then you'll get Buffer objects
  req.setEncoding('utf8');

  // 可读流(Readable stream) emit 'data' 事件 once a 监听器(listener) is added
  req.on('data', function (chunk) {
    body += chunk;
  });

  // the end 事件 tells you that you have entire body
  req.on('end', function () {
    try {
      var data = JSON.parse(body);
    } catch (er) {
      // uh oh!  bad json!
      res.statusCode = 400;
      return res.end('error: ' + er.message);
    }

    // write back something interesting to the user:
    res.write(typeof data);
    res.end();
  });
});

server.listen(1337);

// $ curl localhost:1337 -d '{}'
// object
// $ curl localhost:1337 -d '"foo"'
// string
// $ curl localhost:1337 -d 'not json'
// error: Unexpected token o

类: stream.Readable

可读流(Readable stream)接口是对你正在读取的数据的来源的抽象。换句话说,数据来来自

可读流(Readable stream)不会分发数据,直到你表明准备就绪。

可读流(Readable stream) 有2种模式: 流动模式(flowing mode)暂停模式(paused mode). 流动模式(flowing mode)时,尽快的从底层系统读取数据并提供给你的程序。 暂停模式(paused mode)时, 你必须明确的调用 stream.read() 来读取数据。 暂停模式(paused mode) 是默认模式。

注意: 如果没有绑定数据处理函数,并且没有 pipe() 目标,流会切换到流动模式(flowing mode),并且数据会丢失。

可以通过下面几个方法,将流切换到流动模式(flowing mode)。

  • 添加一个 ['data' 事件][] 事件处理器来监听数据.
  • 调用 resume() 方法来明确的开启数据流。
  • 调用 pipe() 方法来发送数据给Writable.

可以通过以下方法来切换到暂停模式(paused mode):

  • 如果没有 导流(pipe) 目标,调用 pause()方法.
  • 如果有 导流(pipe) 目标, 移除所有的 ['data' 事件][]处理函数, 调用 unpipe() 方法移除所有的 导流(pipe) 目标。

注意, 为了向后兼容考虑, 移除 'data' 事件监听器并不会自动暂停流。同样的,当有导流目标时,调用 pause() 并不能保证流在那些目标排空后,请求更多数据时保持暂停状态。

可读流(Readable stream)例子包括:

事件: 'readable'

当一个数据块可以从流中读出,将会触发'readable' 事件.`

某些情况下, 如果没有准备好,监听一个 'readable' 事件将会导致一些数据从底层系统读取到内部缓存。

javascript
var readble = getReadableStreamSomehow();
readable.on('readable', function() {
  // there is some data to read now
});

一旦内部缓存排空,一旦有更多数据将会再次触发 readable 事件。

事件: 'data'

  • chunk {Buffer | String} 数据块

绑定一个 data 事件的监听器(listener)到一个未明确暂停的流,会将流切换到流动模式。数据会尽额能的传递。

如果你像尽快的从流中获取数据,这是最快的方法。

javascript
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('got %d bytes of data', chunk.length);
});

事件: 'end'

如果没有更多的可读数据,将会触发这个事件。

注意,除非数据已经被完全消费, the end 事件才会触发。 可以通过切换到流动模式(flowing mode)来实现,或者通过调用重复调用 read()获取数据,直到结束。

javascript
    var readable = getReadableStreamSomehow();
    readable.on('data', function(chunk) {
        console.log('got %d bytes of data', chunk.length);
    });
    readable.on('end', function() {
        console.log('there will be no more data.');
    });  

事件: 'close'

当底层资源(例如源头的文件描述符)关闭时触发。并不是所有流都会触发这个事件。

事件: 'error'

  • {Error Object}

当接收数据时发生错误触发。

readable.read([size])

  • size {Number} 可选参数, 需要读入的数据量
  • 返回 {String | Buffer | null}

read() 方法从内部缓存中拉取数据。如果没有可用数据,将会返回null

如果传了 size参数,将会返回相当字节的数据。如果size不可用,将会返回 null

如果你没有指定 size 参数。将会返回内部缓存的所有数据。

这个方法仅能再暂停模式(paused mode)里调用. 流动模式(flowing mode)下这个方法会被自动调用直到内存缓存排空。

javascript
var readable = getReadableStreamSomehow();
readable.on('readable', function() {
  var chunk;
  while (null !== (chunk = readable.read())) {
    console.log('got %d bytes of data', chunk.length);
  }
});

如果这个方法返回一个数据块, 它同时也会触发['data' 事件][].

readable.setEncoding(encoding)

  • encoding {String} 要使用的编码.
  • 返回: this

调用此函数会使得流返回指定编码的字符串,而不是 Buffer 对象。例如,如果你调用readable.setEncoding('utf8'),输出数据将会是UTF-8 编码,并且返回字符串。如果你调用 readable.setEncoding('hex'),将会返回2进制编码的数据。

该方法能正确处理多字节字符。如果不想这么做,仅简单的直接拉取缓存并调buf.toString(encoding) ,可能会导致字节错位。因此,如果你想以字符串读取数据,请使用这个方法。

javascript
var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
  assert.equal(typeof chunk, 'string');
  console.log('got %d characters of string data', chunk.length);
});

readable.resume()

  • 返回: this

这个方法让可读流(Readable stream)继续触发 data 事件.

这个方法会将流切换到流动模式(flowing mode). 如果你不想从流中消费数据,而想得到end 事件,可以调用 readable.resume() 来打开数据流。

javascript
var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', function(chunk) {
  console.log('got to the end, but did not read anything');
});

readable.pause()

  • 返回: this

这个方法会使得流动模式(flowing mode)的流停止触发 data 事件, 切换到流动模式(flowing mode). 并让后续可用数据留在内部缓冲区中。

javascript
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('got %d bytes of data', chunk.length);
  readable.pause();
  console.log('there will be no more data for 1 second');
  setTimeout(function() {
    console.log('now data will start flowing again');
    readable.resume();
  }, 1000);
});

readable.isPaused()

  • 返回: Boolean

这个方法返回readable 是否被客户端代码 明确的暂停(调用 readable.pause())。

var readable = new stream.Readable

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false

readable.pipe(destination[, options])

  • destination {Writable Stream} 写入数据的目标
  • options {Object} 导流(pipe) 选项
    • end {Boolean} 读取到结束符时,结束写入者。默认 = true

这个方法从可读流(Readable stream)拉取所有数据, 并将数据写入到提供的目标中。自动管理流量,这样目标不会快速的可读流(Readable stream)淹没。

可以导流到多个目标。

javascript
var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'
readable.pipe(writable);

这个函数返回目标流, 因此你可以建立导流链:

javascript
var r = fs.createReadStream('file.txt');
var z = zlib.createGzip();
var w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

例如, 模拟 Unix 的 cat 命令:

javascript
process.stdin.pipe(process.stdout);

默认情况下,当源数据流触发 end的时候调用end(),所以 destination 不可再写。传 { end:false }作为options,可以保持目标流打开状态。

这会让 writer保持打开状态,可以在最后写入"Goodbye" 。

javascript
reader.pipe(writer, { end: false });
reader.on('end', function() {
  writer.end('Goodbye\n');
});

注意 process.stderrprocess.stdout 直到进程结束才会关闭,无论是否指定

readable.unpipe([destination])

  • destination {Writable Stream} 可选,指定解除导流的流

这个方法会解除之前调用 pipe() 设置的钩子( pipe() )。

如果没有指定 destination,所有的 导流(pipe) 都会被移除。

如果指定了 destination,但是没有建立如果没有指定 destination,则什么事情都不会发生。

javascript
var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second
readable.pipe(writable);
setTimeout(function() {
  console.log('stop writing to file.txt');
  readable.unpipe(writable);
  console.log('manually close the file stream');
  writable.end();
}, 1000);

readable.unshift(chunk)

  • chunk {Buffer | String} 数据块插入到读队列中

这个方法很有用,当一个流正被一个解析器消费,解析器可能需要将某些刚拉取出的数据“逆消费”,返回到原来的源,以便流能将它传递给其它消费者。

如果你在程序中必须经常调用 stream.unshift(chunk) ,那你可以考虑实现Transform来替换(参见下文API for Stream Implementors)。

javascript
// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
var StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  var decoder = new StringDecoder('utf8');
  var header = '';
  function onReadable() {
    var chunk;
    while (null !== (chunk = stream.read())) {
      var str = decoder.write(chunk);
      if (str.match(/\n\n/)) {
        // found the header boundary
        var split = str.split(/\n\n/);
        header += split.shift();
        var remaining = split.join('\n\n');
        var buf = new Buffer(remaining, 'utf8');
        if (buf.length)
          stream.unshift(buf);
        stream.removeListener('error', callback);
        stream.removeListener('readable', onReadable);
        // now the body of the message can be read from the stream.
        callback(null, header, stream);
      } else {
        // still reading the header.
        header += str;
      }
    }
  }
}

readable.wrap(stream)

  • stream {Stream} 一个旧式的可读流(Readable stream)

v0.10 版本之前的 Node 流并未实现现在所有流的API(更多信息详见下文“兼容性”章节)。

如果你使用的是旧的 Node 库,它触发 'data' 事件,并拥有仅做查询用的pause() 方法,那么你能使用wrap() 方法来创建一个Readable 流来使用旧版本的流,作为数据源。

你应该很少需要用到这个函数,但它会留下方便和旧版本的 Node 程序和库交互。

例如:

javascript
var OldReader = require('./old-api-module.js').OldReader;
var oreader = new OldReader;
var Readable = require('stream').Readable;
var myReader = new Readable().wrap(oreader);

myReader.on('readable', function() {
  myReader.read(); // etc.
});

类: stream.Writable

可写流(Writable stream )接口是你正把数据写到一个目标的抽象。

可写流(Writable stream )的例子包括:

writable.write(chunk[, encoding][, callback])

  • chunk {String | Buffer} 准备写的数据
  • encoding {String} 编码方式(如果chunk 是字符串)
  • callback {Function} 数据块写入后的回调
  • 返回: {Boolean} 如果数据已被全部处理返回true

这个方法向底层系统写入数据,并在数据处理完毕后调用所给的回调。

返回值表示你是否应该继续立即写入。如果数据要缓存在内部,将会返回false。否则返回 true

返回值仅供参考。即使返回 false,你也可能继续写。但是写会缓存在内存里,所以不要做的太过分。最好的办法是等待drain 事件后,再写入数据。

事件: 'drain'

如果调用 writable.write(chunk) 返回 false, drain 事件会告诉你什么时候将更多的数据写入到流中。

javascript
// Write the data to the supplied 可写流(Writable stream ) 1MM times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
  var i = 1000000;
  write();
  function write() {
    var ok = true;
    

←上一篇: Node.js 加密
关闭
程序员人生