taskprocess.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package main
  2. import (
  3. "commonapi"
  4. "errors"
  5. "fmt"
  6. "time"
  7. _ "lite"
  8. _ "bitcoin"
  9. _ "bucash"
  10. )
  11. func sysStart() {
  12. go dataProcesser(CPUNUM)
  13. httpNetWorkServer()
  14. }
  15. type taskHandler struct {
  16. retcode int
  17. data []byte
  18. complete chan bool
  19. }
  20. func NewtaskHandler() *taskHandler {
  21. taskhdl := &taskHandler{}
  22. taskhdl.complete = make(chan bool, 1)
  23. return taskhdl
  24. }
  25. func (s *taskHandler) waitProcessData() ([]byte, error) {
  26. timeout := time.NewTicker(time.Duration(20) * time.Second)
  27. select {
  28. case <-timeout.C:
  29. log.Error("deal request timeout")
  30. return nil, errors.New("timeout")
  31. case <-s.complete:
  32. }
  33. return s.data, nil
  34. }
  35. func dataProcesser(processNum int) {
  36. for i := 0; i < processNum; i++ {
  37. go func(index int) {
  38. for {
  39. task := <-taskChan[index]
  40. go processer(task)
  41. }
  42. }(i)
  43. }
  44. }
  45. func processer(task *taskInfo) {
  46. c, err := commonapi.New(task.cointype)
  47. if err != nil {
  48. sendstr := fmt.Sprintf(`{"errcode:%v,"msg":"%v"}`, INIT_DRIVER_ERR, GetErrMsg(INIT_DRIVER_ERR, nil))
  49. log.Error("sendstr:%v", sendstr)
  50. task.resp.data = []byte(sendstr)
  51. task.resp.complete <- false
  52. return
  53. }
  54. log.Debug("urlpath:%v", task.urlpath)
  55. resp, err := commonapi.Handler[task.urlpath](c, task.jshandler)
  56. if err != nil {
  57. task.resp.data = []byte(err.Error())
  58. task.resp.complete <- false
  59. return
  60. }
  61. task.resp.retcode = OK
  62. task.resp.data = resp
  63. task.resp.complete <- true
  64. return
  65. }