asyn_read.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package base
  2. import "io"
  3. import "bytes"
  4. type AsynReader struct {
  5. buf *bytes.Buffer
  6. datach chan []byte
  7. err chan error
  8. done chan int
  9. eof bool
  10. }
  11. func NewAsynReader(r io.Reader, buflen int) *AsynReader {
  12. reader := &AsynReader{}
  13. reader.buf = bytes.NewBufferString("")
  14. reader.datach = make(chan []byte)
  15. reader.err = make(chan error)
  16. reader.done = make(chan int, 1)
  17. reader.done <- 1
  18. go func() {
  19. data := make([]byte, buflen)
  20. for {
  21. <-reader.done
  22. n, err := r.Read(data)
  23. if err != nil && (err.Error() == "ErrNoData") {
  24. //log.Println("new asyn reader, errnodata")
  25. reader.datach <- nil
  26. reader.err <- err
  27. continue
  28. }
  29. if err != nil {
  30. //log.Println("asyn reader send error ", err)
  31. reader.datach <- nil
  32. reader.err <- err
  33. break
  34. } else {
  35. reader.datach <- data[0:n]
  36. }
  37. }
  38. }()
  39. return reader
  40. }
  41. func (reader *AsynReader) read() (error) {
  42. var err error
  43. br := false
  44. for {
  45. select {
  46. case data := <-reader.datach:
  47. if data == nil {
  48. err = <-reader.err
  49. //log.Println("asyn reader() get error", err)
  50. if err.Error() == "ErrNoData" {
  51. reader.done <- 1
  52. //log.Println("asyn read()", err.Error())
  53. break
  54. }
  55. br = true
  56. reader.eof = true
  57. break
  58. }
  59. reader.buf.Write(data)
  60. reader.done <- 1
  61. default:
  62. br = true
  63. }
  64. if br {
  65. break
  66. }
  67. }
  68. return err
  69. }
  70. func (reader *AsynReader) readdata() (error) {
  71. //先尝试异步的读取数据
  72. err := reader.read()
  73. if (err != nil) {
  74. return err
  75. }
  76. //如果buf中没有数据了,并且没有发生错误,那么等待数据
  77. if (reader.eof == false && reader.buf.Len() == 0) {
  78. data := <-reader.datach
  79. if data == nil {
  80. err = <-reader.err
  81. //log.Println("asyn readdata() get error", err)
  82. if err.Error() == "ErrNoData" {
  83. reader.done <- 1
  84. //log.Println("asyn readdata()", err.Error())
  85. return err
  86. }
  87. reader.eof = true
  88. return err
  89. }
  90. reader.buf.Write(data)
  91. reader.done <- 1
  92. }
  93. return nil
  94. }
  95. //read的特点是,如果没有数据就等待,这个和普通的读取数据差别不大。
  96. func (reader *AsynReader) Read(b []byte) (int, error) {
  97. err := reader.readdata()
  98. if err == io.EOF && reader.buf.Len() > 0 {
  99. return reader.buf.Read(b)
  100. }
  101. if err != nil {
  102. return 0, err
  103. }
  104. return reader.buf.Read(b)
  105. }
  106. //这个是一个新添加的功能,也是这个异步读取区别一般reader的关键
  107. //也就是在读取之前,先看看buf中有没有数据,如果没有数据,那么返回false
  108. func (reader *AsynReader) CanRead(n int) (bool, error) {
  109. err := reader.read()
  110. //发生异常,要重新来过,buf里面的数据要抛弃
  111. if err != nil && err != io.EOF {
  112. return false, err
  113. }
  114. if n <= reader.buf.Len() {
  115. return true, nil
  116. } else { //永远不可能再读到数据了,所以要返回错误,结束程序
  117. return false, err
  118. }
  119. return false,nil
  120. }