package base import "io" import "bytes" type AsynReader struct { buf *bytes.Buffer datach chan []byte err chan error done chan int eof bool } func NewAsynReader(r io.Reader, buflen int) *AsynReader { reader := &AsynReader{} reader.buf = bytes.NewBufferString("") reader.datach = make(chan []byte) reader.err = make(chan error) reader.done = make(chan int, 1) reader.done <- 1 go func() { data := make([]byte, buflen) for { <-reader.done n, err := r.Read(data) if err != nil && (err.Error() == "ErrNoData") { //log.Println("new asyn reader, errnodata") reader.datach <- nil reader.err <- err continue } if err != nil { //log.Println("asyn reader send error ", err) reader.datach <- nil reader.err <- err break } else { reader.datach <- data[0:n] } } }() return reader } func (reader *AsynReader) read() (error) { var err error br := false for { select { case data := <-reader.datach: if data == nil { err = <-reader.err //log.Println("asyn reader() get error", err) if err.Error() == "ErrNoData" { reader.done <- 1 //log.Println("asyn read()", err.Error()) break } br = true reader.eof = true break } reader.buf.Write(data) reader.done <- 1 default: br = true } if br { break } } return err } func (reader *AsynReader) readdata() (error) { //先尝试异步的读取数据 err := reader.read() if (err != nil) { return err } //如果buf中没有数据了,并且没有发生错误,那么等待数据 if (reader.eof == false && reader.buf.Len() == 0) { data := <-reader.datach if data == nil { err = <-reader.err //log.Println("asyn readdata() get error", err) if err.Error() == "ErrNoData" { reader.done <- 1 //log.Println("asyn readdata()", err.Error()) return err } reader.eof = true return err } reader.buf.Write(data) reader.done <- 1 } return nil } //read的特点是,如果没有数据就等待,这个和普通的读取数据差别不大。 func (reader *AsynReader) Read(b []byte) (int, error) { err := reader.readdata() if err == io.EOF && reader.buf.Len() > 0 { return reader.buf.Read(b) } if err != nil { return 0, err } return reader.buf.Read(b) } //这个是一个新添加的功能,也是这个异步读取区别一般reader的关键 //也就是在读取之前,先看看buf中有没有数据,如果没有数据,那么返回false func (reader *AsynReader) CanRead(n int) (bool, error) { err := reader.read() //发生异常,要重新来过,buf里面的数据要抛弃 if err != nil && err != io.EOF { return false, err } if n <= reader.buf.Len() { return true, nil } else { //永远不可能再读到数据了,所以要返回错误,结束程序 return false, err } return false,nil }