123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- package tick
- import "time"
- //import "log"
- import "errors"
- import "tickserver/framework/msq"
- type LogClient struct {
- logclient *msq.MsqClient
- logId int64
- }
- func NewLogClient(server *msq.MsqServer, logId int64) (*LogClient, error) {
- this := &LogClient{}
- this.logclient = msq.NewMsqClient(server, time.Second, true, 10240)
- go this.start()
- err := this.logclient.ConnectPrivate()
- if err != nil {
- //log.Println(err)
- return nil, err
- }
- this.logId = logId
- return this, nil
- }
- func (this *LogClient) SetLogId(logId int64) {
- this.logId = logId
- }
- func (this *LogClient) start() {
- //log.Println("LogClient start()")
- this.logclient.RecvMulti(func(msgs []*msq.Message) {
- this.log(msgs)
- })
- //log.Println("LogClient start end")
- }
- //database id,time,insId,data
- func (this *LogClient) log(msgs []*msq.Message) {
- msgs2 := make([]*msq.Message, len(msgs))
- server := this.logclient.GetMsq()
- for i := 0; i < len(msgs); i++ {
- msgs2[i] = server.ProcessMultiStart(msgs[i])
- msgs2[i].RecvTime = nowTime()
- }
- retry := 2
- for {
- retry--
- if retry == 0 {
- break
- }
- err := this.log2(msgs2)
- if err != nil {
- //log.Println("log message err = ", err)
- time.Sleep(time.Second)
- continue
- }
- break
- }
- //保存成功
- //重新发送到队列中去
- for i := 0; i < len(msgs); i++ {
- server.ProcessMultiEnd(msgs[i], msgs2[i])
- }
- }
- func (this *LogClient) log2(msgs []*msq.Message) error {
- for i := 0; i < len(msgs); i++ {
- base, ok := msgs[i].GetData().(*Market)
- if !ok {
- msgs[i].SetErr(errors.New("msg not impl IReqbase"))
- continue
- }
- var err error
- typ := DataTypeName(int(base.Type))
- logsave, ok1 := tserver.logsaves[typ]
- if !ok1 {
- msgs[i].SetErr(errors.New("no save writer"))
- continue
- }
- m2 := Market2{
- InsId: base.InsId,
- Timestamp: base.Timestamp,
- Close: base.Close,
- Open: base.Open,
- High: base.High,
- Low: base.Low,
- AllVolume: base.AllVolume,
- AllAmount: base.AllAmount,
- LastPrice: base.LastPrice,
- LastVolume: base.LastVolume,
- //Bids: base.Bids,
- //Asks: base.Asks,
- Type: int64(base.Type),
- }
- for i, v := range base.Bids {
- if i >= 10 {
- break
- }
- m2.Bids[i] = v
- }
- for i, v := range base.Asks {
- if i >= 10 {
- break
- }
- m2.Asks[i] = v
- }
- ms := &MarketSave{m2}
- err = logsave.Save(ms)
- if err != nil {
- msgs[i].SetErr(err)
- //log.Println("debugggggggg33333333", DataTypeName(int(base.Type)), m2.InsId, err)
- continue
- }
- }
- return nil
- }
- func (this *LogClient) Log(msg *msq.Message) error {
- return this.logclient.Send(msg)
- }
|