123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- package base
- import "io"
- import "bytes"
- type AsynReader struct {
- buf *bytes.Buffer
- datach chan []byte
- err chan error
- done chan int
- eof bool
- }
- func NewAsynReader(r io.Reader, buflen int) *AsynReader {
- reader := &AsynReader{}
- reader.buf = bytes.NewBufferString("")
- reader.datach = make(chan []byte)
- reader.err = make(chan error)
- reader.done = make(chan int, 1)
- reader.done <- 1
- go func() {
- data := make([]byte, buflen)
- for {
- <-reader.done
- n, err := r.Read(data)
- if err != nil && (err.Error() == "ErrNoData") {
- //log.Println("new asyn reader, errnodata")
- reader.datach <- nil
- reader.err <- err
- continue
- }
- if err != nil {
- //log.Println("asyn reader send error ", err)
- reader.datach <- nil
- reader.err <- err
- break
- } else {
- reader.datach <- data[0:n]
- }
- }
- }()
- return reader
- }
- func (reader *AsynReader) read() (error) {
- var err error
- br := false
- for {
- select {
- case data := <-reader.datach:
- if data == nil {
- err = <-reader.err
- //log.Println("asyn reader() get error", err)
- if err.Error() == "ErrNoData" {
- reader.done <- 1
- //log.Println("asyn read()", err.Error())
- break
- }
- br = true
- reader.eof = true
- break
- }
- reader.buf.Write(data)
- reader.done <- 1
- default:
- br = true
- }
- if br {
- break
- }
- }
- return err
- }
- func (reader *AsynReader) readdata() (error) {
- //先尝试异步的读取数据
- err := reader.read()
- if (err != nil) {
- return err
- }
- //如果buf中没有数据了,并且没有发生错误,那么等待数据
- if (reader.eof == false && reader.buf.Len() == 0) {
- data := <-reader.datach
- if data == nil {
- err = <-reader.err
- //log.Println("asyn readdata() get error", err)
- if err.Error() == "ErrNoData" {
- reader.done <- 1
- //log.Println("asyn readdata()", err.Error())
- return err
- }
- reader.eof = true
- return err
- }
- reader.buf.Write(data)
- reader.done <- 1
- }
- return nil
- }
- //read的特点是,如果没有数据就等待,这个和普通的读取数据差别不大。
- func (reader *AsynReader) Read(b []byte) (int, error) {
- err := reader.readdata()
- if err == io.EOF && reader.buf.Len() > 0 {
- return reader.buf.Read(b)
- }
- if err != nil {
- return 0, err
- }
- return reader.buf.Read(b)
- }
- //这个是一个新添加的功能,也是这个异步读取区别一般reader的关键
- //也就是在读取之前,先看看buf中有没有数据,如果没有数据,那么返回false
- func (reader *AsynReader) CanRead(n int) (bool, error) {
- err := reader.read()
- //发生异常,要重新来过,buf里面的数据要抛弃
- if err != nil && err != io.EOF {
- return false, err
- }
- if n <= reader.buf.Len() {
- return true, nil
- } else { //永远不可能再读到数据了,所以要返回错误,结束程序
- return false, err
- }
- return false,nil
- }
|