package tick //管理整体的一个结构 //接受事件,更新 //接受事件,fetch import "tickserver/framework/msq" import "time" import "errors" import "sync" import "tickserver/framework/store" //import "log" var ErrChanBusy = errors.New("ErrChanBusy") var ErrChanClosed = errors.New("ErrChanClosed") var ErrTimeRange = errors.New("ErrTimeRange") var ErrDataSourceType = errors.New("ErrDataSourceType") var ErrTimeOrder = errors.New("ErrTimeOrder") var ErrNoData = errors.New("ErrNoData") type TickServer struct { server *msq.MsqServer logclient *LogClient //源数据本地保存写文件,一个小时一个压缩文件 client *msq.MsqClient queryclient *QueryClient //源数据下载信息查询(数据库tick_server的tick_index表查询) subss map[int64]*Subscribe tickss map[string]map[int64]*Market datasource map[string]DataSource logsaves map[string]*store.Save subId int64 } func NewTickServer() (*TickServer, error) { s := &TickServer{} s.server = msq.NewMsqServer() go s.server.Start() //交易结果通过mtf发出来,配对成功之类的消息 var err error s.logclient, err = NewLogClient(s.server, 0) if err != nil { return nil, err } s.queryclient, err = NewQueryClient(s.server) if err != nil { return nil, err } //处理消息 client := msq.NewMsqClient(s.server, time.Second, false, 102400) go client.Recv(nil) err = client.ConnectPrivate() if err != nil { return nil, err } s.client = client s.logAction() //源数据广播给tickserver,并送入本地保存通道,并保存为内存缓存tick数据 s.ticksAction() //获取内存缓存最新tick数据(实际没有被用到) s.downloadAction() //供ts下载ds本地保存的源数据文件数据,暂未支持 s.historyAction() //供ts获取ds本地保存的源数据文件信息(tick_index表) s.subAction() //ts订阅行情 s.instrumentsAction() //ts获取某类型的所有symbol的信息 s.instrumentAction() //ts获取某类型的某个symbol的信息 s.subss = make(map[int64]*Subscribe) //订阅信息 s.tickss = make(map[string]map[int64]*Market) //最新tick数据 s.datasource = make(map[string]DataSource) //数据源 s.logsaves = make(map[string]*store.Save) //源数据保存对象 return s, nil } func (s *TickServer) AddSaveWriter(typ string, bDownload bool) { ms := &MarketSave{} //log.Println("AddSaveWriter B") logsave, err := store.NewSaveWriter(serverconf.DsMap[typ].SaveDir, typ, bDownload, ms, db) if err != nil { //log.Println("debugggggggg22222222", err) return } //log.Println("AddSaveWriter E") s.logsaves[typ] = logsave } func (s *TickServer) AddDataSource(ds DataSource, bDownload bool) { _, ok := s.datasource[ds.Name()] if !ok { s.datasource[ds.Name()] = ds s.AddSaveWriter(ds.Name(), bDownload) mk := ds.GetMarket() go func() { for mkdata := range mk { //log.Println("send fuck") s.client.SendMessage(msq.MsgTK, mkdata) //保存到ds本地文件,广播给ts,缓存到ds内存 } }() } } func (s *TickServer) GetMsq() *msq.MsqServer { return s.server } func (s *TickServer) GetClient() *msq.MsqClient { return s.client } func (s *TickServer) SetLogId(logId int64) { s.logclient.SetLogId(logId) } func (s *TickServer) GetInstruments(typ string) []Instrument { ds, ok := s.datasource[typ] if ok { return ds.GetInstrument() } return nil } func (s *TickServer) GetInstrument(typ string, id int64) (Instrument, error) { ds, ok := s.datasource[typ] if ok { inss := ds.GetInstrument() for _, v := range inss { if v.Id == id { return v, nil } } } var ins Instrument return ins, errors.New("no ins") } func (s *TickServer) logAction() { s.server.RegisterAction(msq.MsgTK, func(mtf *msq.MsqServer, msg *msq.Message) { //send to chan if msg.Type == msq.MsgTK { mk := msg.GetData().(*Market) //log.Println("log fuck") for key, sub := range s.subss { err := sub.Send(mk) if err == ErrChanClosed { delete(s.subss, key) } if err == ErrChanBusy { sub.Clear() sub.Send(mk) } } //set ticks ty := DataTypeName(int(mk.Type)) insId := mk.InsId if data, ok := s.tickss[ty]; ok { data[insId] = mk } else { s.tickss[ty] = make(map[int64]*Market) s.tickss[ty][insId] = mk } } s.logclient.Log(msg) }) } func unixNow() int32 { sec := time.Now().Unix() return int32(sec) } func nowTime() int64 { return time.Now().UnixNano() } func (s *TickServer) ticksAction() { s.server.RegisterAction(msq.MsgTKGetTicks, func(mtf *msq.MsqServer, msg *msq.Message) { req := msg.GetData().(*TypeRequest) data, ok := s.tickss[req.Type] if !ok { msg.SetErr(errors.New("ticks::req.Type no data")) return } var ticks []*Market for _, tick := range data { ticks = append(ticks, tick) } msg.SetData(ticks) }) } func (s *TickServer) instrumentsAction() { s.server.RegisterAction(msq.MsgInss, func(mtf *msq.MsqServer, msg *msq.Message) { req := msg.GetData().(*InstrumentsRequest) inss := s.GetInstruments(req.Type) if inss == nil { msg.SetErr(errors.New("instruments::req.Type no data")) return } msg.SetData(inss) }) } func (s *TickServer) instrumentAction() { s.server.RegisterAction(msq.MsgIns, func(mtf *msq.MsqServer, msg *msq.Message) { req := msg.GetData().(*InstrumentRequest) ins, err := s.GetInstrument(req.Type, req.Id) if err != nil { msg.SetErr(errors.New("instrument::req.Type no data")) return } msg.SetData(ins) }) } func (s *TickServer) downloadAction() { s.server.RegisterAction(msq.MsgTKDown, func(mtf *msq.MsqServer, msg *msq.Message) { s.queryclient.Query(msg) }) } func (s *TickServer) historyAction() { s.server.RegisterAction(msq.MsgTKHis, func(mtf *msq.MsqServer, msg *msq.Message) { s.queryclient.Query(msg) }) } func (s *TickServer) subAction() { s.server.RegisterAction(msq.MsgTKSub, func(mtf *msq.MsqServer, msg *msq.Message) { req := msg.GetData().(*StreamRequest) sub, err := s.Subscibe(req.Type) msg.SetData(sub) msg.SetErr(err) }) } func (s *TickServer) Start() { s.server.Start() } func (s *TickServer) Close() error { return s.server.Close() } func (s *TickServer) Subscibe(ty string) (*Subscribe, error) { s.subId++ sub := &Subscribe{} sub.server = s sub.ty = ty sub.tyId = int32(TypeId(ty)) sub.id = s.subId sub.chmk = make(chan *Market, 1024) s.subss[sub.id] = sub return sub, nil } type Subscribe struct { server *TickServer chmk chan *Market ty string id int64 tyId int32 isclosed bool mu sync.Mutex } func (sub *Subscribe) Close() { sub.mu.Lock() defer sub.mu.Unlock() sub.isclosed = true } func (sub *Subscribe) Clear() { for { select { case <-sub.chmk: //if mk.Type == IntTdx && mk.InsId == 1 { //log.Println("[subscribe]data trace", mk.Type, mk.InsId) //} default: return } } } func (sub *Subscribe) IsClosed() bool { sub.mu.Lock() defer sub.mu.Unlock() return sub.isclosed } func (sub *Subscribe) Send(mk *Market) error { if sub.IsClosed() { return ErrChanClosed } if mk.Type != sub.tyId { return nil } //尝试发送 select { case sub.chmk <- mk: default: return ErrChanBusy } return nil }