log_client.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package msq
  2. import "time"
  3. import "log"
  4. type LogClient struct {
  5. logclient *MsqClient
  6. logId int64
  7. }
  8. func NewLogClient(server *MsqServer, logId int64) (*LogClient, error) {
  9. this := &LogClient{}
  10. this.logclient = NewMsqClient(server, time.Second, true, 10240)
  11. go this.start()
  12. err := this.logclient.ConnectPrivate()
  13. if err != nil {
  14. log.Println(err)
  15. return nil, err
  16. }
  17. this.logId = logId
  18. return this, nil
  19. }
  20. func (this *LogClient) start() {
  21. //log.Println("LogClient start()")
  22. this.logclient.RecvMulti(func (msgs []*Message) {
  23. this.log(msgs)
  24. })
  25. log.Println("LogClient start end")
  26. }
  27. func (this *LogClient) log(msgs []*Message) {
  28. msgs2 := make([]*Message, len(msgs))
  29. server := this.logclient.GetMsq()
  30. for i := 0; i < len(msgs); i++ {
  31. msgs2[i] = server.ProcessMultiStart(msgs[i])
  32. this.logId++
  33. msgs2[i].Id = this.logId
  34. }
  35. //保存到数据库
  36. err := this.log2(msgs2)
  37. if err != nil {
  38. for i := 0; i < len(msgs); i++ {
  39. msgs2[i].SetErr(err)
  40. }
  41. }
  42. //保存成功
  43. //重新发送到队列中去
  44. for i := 0; i < len(msgs); i++ {
  45. server.ProcessMultiEnd(msgs[i], msgs2[i])
  46. }
  47. }
  48. func (this *LogClient) log2(msgs []*Message) error {
  49. time.Sleep(time.Millisecond * 100)
  50. log.Println("log message:", len(msgs))
  51. return nil
  52. }
  53. func (this *LogClient) Log(msg *Message) error {
  54. return this.logclient.Send(msg)
  55. }