1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- package msq
- import "time"
- import "log"
- type LogClient struct {
- logclient *MsqClient
- logId int64
- }
- func NewLogClient(server *MsqServer, logId int64) (*LogClient, error) {
- this := &LogClient{}
- this.logclient = NewMsqClient(server, time.Second, true, 10240)
- go this.start()
- err := this.logclient.ConnectPrivate()
- if err != nil {
- log.Println(err)
- return nil, err
- }
- this.logId = logId
- return this, nil
- }
- func (this *LogClient) start() {
- //log.Println("LogClient start()")
- this.logclient.RecvMulti(func (msgs []*Message) {
- this.log(msgs)
- })
- log.Println("LogClient start end")
- }
- func (this *LogClient) log(msgs []*Message) {
- msgs2 := make([]*Message, len(msgs))
- server := this.logclient.GetMsq()
- for i := 0; i < len(msgs); i++ {
- msgs2[i] = server.ProcessMultiStart(msgs[i])
- this.logId++
- msgs2[i].Id = this.logId
- }
- //保存到数据库
- err := this.log2(msgs2)
- if err != nil {
- for i := 0; i < len(msgs); i++ {
- msgs2[i].SetErr(err)
- }
- }
- //保存成功
- //重新发送到队列中去
- for i := 0; i < len(msgs); i++ {
- server.ProcessMultiEnd(msgs[i], msgs2[i])
- }
- }
- func (this *LogClient) log2(msgs []*Message) error {
- time.Sleep(time.Millisecond * 100)
- log.Println("log message:", len(msgs))
- return nil
- }
- func (this *LogClient) Log(msg *Message) error {
- return this.logclient.Send(msg)
- }
|