|
- package tick
- //管理整体的一个结构
- //接受事件,更新
- //接受事件,fetch
- import "tickserver/framework/msq"
- import "time"
- import "errors"
- import "sync"
- import "tickserver/framework/store"
- //import "log"
- var ErrChanBusy = errors.New("ErrChanBusy")
- var ErrChanClosed = errors.New("ErrChanClosed")
- var ErrTimeRange = errors.New("ErrTimeRange")
- var ErrDataSourceType = errors.New("ErrDataSourceType")
- var ErrTimeOrder = errors.New("ErrTimeOrder")
- var ErrNoData = errors.New("ErrNoData")
- type TickServer struct {
- server *msq.MsqServer
- logclient *LogClient //源数据本地保存写文件,一个小时一个压缩文件
- client *msq.MsqClient
- queryclient *QueryClient //源数据下载信息查询(数据库tick_server的tick_index表查询)
- subss map[int64]*Subscribe
- tickss map[string]map[int64]*Market
- datasource map[string]DataSource
- logsaves map[string]*store.Save
- subId int64
- }
- func NewTickServer() (*TickServer, error) {
- s := &TickServer{}
- s.server = msq.NewMsqServer()
- go s.server.Start()
- //交易结果通过mtf发出来,配对成功之类的消息
- var err error
- s.logclient, err = NewLogClient(s.server, 0)
- if err != nil {
- return nil, err
- }
- s.queryclient, err = NewQueryClient(s.server)
- if err != nil {
- return nil, err
- }
- //处理消息
- client := msq.NewMsqClient(s.server, time.Second, false, 102400)
- go client.Recv(nil)
- err = client.ConnectPrivate()
- if err != nil {
- return nil, err
- }
- s.client = client
- s.logAction() //源数据广播给tickserver,并送入本地保存通道,并保存为内存缓存tick数据
- s.ticksAction() //获取内存缓存最新tick数据(实际没有被用到)
- s.downloadAction() //供ts下载ds本地保存的源数据文件数据,暂未支持
- s.historyAction() //供ts获取ds本地保存的源数据文件信息(tick_index表)
- s.subAction() //ts订阅行情
- s.instrumentsAction() //ts获取某类型的所有symbol的信息
- s.instrumentAction() //ts获取某类型的某个symbol的信息
- s.subss = make(map[int64]*Subscribe) //订阅信息
- s.tickss = make(map[string]map[int64]*Market) //最新tick数据
- s.datasource = make(map[string]DataSource) //数据源
- s.logsaves = make(map[string]*store.Save) //源数据保存对象
- return s, nil
- }
- func (s *TickServer) AddSaveWriter(typ string, bDownload bool) {
- ms := &MarketSave{}
- //log.Println("AddSaveWriter B")
- logsave, err := store.NewSaveWriter(serverconf.DsMap[typ].SaveDir, typ, bDownload, ms, db)
- if err != nil {
- //log.Println("debugggggggg22222222", err)
- return
- }
- //log.Println("AddSaveWriter E")
- s.logsaves[typ] = logsave
- }
- func (s *TickServer) AddDataSource(ds DataSource, bDownload bool) {
- _, ok := s.datasource[ds.Name()]
- if !ok {
- s.datasource[ds.Name()] = ds
- s.AddSaveWriter(ds.Name(), bDownload)
- mk := ds.GetMarket()
- go func() {
- for mkdata := range mk {
- //log.Println("send fuck")
- s.client.SendMessage(msq.MsgTK, mkdata) //保存到ds本地文件,广播给ts,缓存到ds内存
- }
- }()
- }
- }
- func (s *TickServer) GetMsq() *msq.MsqServer {
- return s.server
- }
- func (s *TickServer) GetClient() *msq.MsqClient {
- return s.client
- }
- func (s *TickServer) SetLogId(logId int64) {
- s.logclient.SetLogId(logId)
- }
- func (s *TickServer) GetInstruments(typ string) []Instrument {
- ds, ok := s.datasource[typ]
- if ok {
- return ds.GetInstrument()
- }
- return nil
- }
- func (s *TickServer) GetInstrument(typ string, id int64) (Instrument, error) {
- ds, ok := s.datasource[typ]
- if ok {
- inss := ds.GetInstrument()
- for _, v := range inss {
- if v.Id == id {
- return v, nil
- }
- }
- }
- var ins Instrument
- return ins, errors.New("no ins")
- }
- func (s *TickServer) logAction() {
- s.server.RegisterAction(msq.MsgTK, func(mtf *msq.MsqServer, msg *msq.Message) {
- //send to chan
- if msg.Type == msq.MsgTK {
- mk := msg.GetData().(*Market)
- //log.Println("log fuck")
- for key, sub := range s.subss {
- err := sub.Send(mk)
- if err == ErrChanClosed {
- delete(s.subss, key)
- }
- if err == ErrChanBusy {
- sub.Clear()
- sub.Send(mk)
- }
- }
- //set ticks
- ty := DataTypeName(int(mk.Type))
- insId := mk.InsId
- if data, ok := s.tickss[ty]; ok {
- data[insId] = mk
- } else {
- s.tickss[ty] = make(map[int64]*Market)
- s.tickss[ty][insId] = mk
- }
- }
- s.logclient.Log(msg)
- })
- }
- func unixNow() int32 {
- sec := time.Now().Unix()
- return int32(sec)
- }
- func nowTime() int64 {
- return time.Now().UnixNano()
- }
- func (s *TickServer) ticksAction() {
- s.server.RegisterAction(msq.MsgTKGetTicks, func(mtf *msq.MsqServer, msg *msq.Message) {
- req := msg.GetData().(*TypeRequest)
- data, ok := s.tickss[req.Type]
- if !ok {
- msg.SetErr(errors.New("ticks::req.Type no data"))
- return
- }
- var ticks []*Market
- for _, tick := range data {
- ticks = append(ticks, tick)
- }
- msg.SetData(ticks)
- })
- }
- func (s *TickServer) instrumentsAction() {
- s.server.RegisterAction(msq.MsgInss, func(mtf *msq.MsqServer, msg *msq.Message) {
- req := msg.GetData().(*InstrumentsRequest)
- inss := s.GetInstruments(req.Type)
- if inss == nil {
- msg.SetErr(errors.New("instruments::req.Type no data"))
- return
- }
- msg.SetData(inss)
- })
- }
- func (s *TickServer) instrumentAction() {
- s.server.RegisterAction(msq.MsgIns, func(mtf *msq.MsqServer, msg *msq.Message) {
- req := msg.GetData().(*InstrumentRequest)
- ins, err := s.GetInstrument(req.Type, req.Id)
- if err != nil {
- msg.SetErr(errors.New("instrument::req.Type no data"))
- return
- }
- msg.SetData(ins)
- })
- }
- func (s *TickServer) downloadAction() {
- s.server.RegisterAction(msq.MsgTKDown, func(mtf *msq.MsqServer, msg *msq.Message) {
- s.queryclient.Query(msg)
- })
- }
- func (s *TickServer) historyAction() {
- s.server.RegisterAction(msq.MsgTKHis, func(mtf *msq.MsqServer, msg *msq.Message) {
- s.queryclient.Query(msg)
- })
- }
- func (s *TickServer) subAction() {
- s.server.RegisterAction(msq.MsgTKSub, func(mtf *msq.MsqServer, msg *msq.Message) {
- req := msg.GetData().(*StreamRequest)
- sub, err := s.Subscibe(req.Type)
- msg.SetData(sub)
- msg.SetErr(err)
- })
- }
- func (s *TickServer) Start() {
- s.server.Start()
- }
- func (s *TickServer) Close() error {
- return s.server.Close()
- }
- func (s *TickServer) Subscibe(ty string) (*Subscribe, error) {
- s.subId++
- sub := &Subscribe{}
- sub.server = s
- sub.ty = ty
- sub.tyId = int32(TypeId(ty))
- sub.id = s.subId
- sub.chmk = make(chan *Market, 1024)
- s.subss[sub.id] = sub
- return sub, nil
- }
- type Subscribe struct {
- server *TickServer
- chmk chan *Market
- ty string
- id int64
- tyId int32
- isclosed bool
- mu sync.Mutex
- }
- func (sub *Subscribe) Close() {
- sub.mu.Lock()
- defer sub.mu.Unlock()
- sub.isclosed = true
- }
- func (sub *Subscribe) Clear() {
- for {
- select {
- case <-sub.chmk:
- //if mk.Type == IntTdx && mk.InsId == 1 {
- //log.Println("[subscribe]data trace", mk.Type, mk.InsId)
- //}
- default:
- return
- }
- }
- }
- func (sub *Subscribe) IsClosed() bool {
- sub.mu.Lock()
- defer sub.mu.Unlock()
- return sub.isclosed
- }
- func (sub *Subscribe) Send(mk *Market) error {
- if sub.IsClosed() {
- return ErrChanClosed
- }
- if mk.Type != sub.tyId {
- return nil
- }
- //尝试发送
- select {
- case sub.chmk <- mk:
- default:
- return ErrChanBusy
- }
- return nil
- }
|