tickserver.go 7.6 KB


  1. package tick
  2. //管理整体的一个结构
  3. //接受事件,更新
  4. //接受事件,fetch
  5. import "tickserver/framework/msq"
  6. import "time"
  7. import "errors"
  8. import "sync"
  9. import "tickserver/framework/store"
  10. //import "log"
  11. var ErrChanBusy = errors.New("ErrChanBusy")
  12. var ErrChanClosed = errors.New("ErrChanClosed")
  13. var ErrTimeRange = errors.New("ErrTimeRange")
  14. var ErrDataSourceType = errors.New("ErrDataSourceType")
  15. var ErrTimeOrder = errors.New("ErrTimeOrder")
  16. var ErrNoData = errors.New("ErrNoData")
  17. type TickServer struct {
  18. server *msq.MsqServer
  19. logclient *LogClient //源数据本地保存写文件,一个小时一个压缩文件
  20. client *msq.MsqClient
  21. queryclient *QueryClient //源数据下载信息查询(数据库tick_server的tick_index表查询)
  22. subss map[int64]*Subscribe
  23. tickss map[string]map[int64]*Market
  24. datasource map[string]DataSource
  25. logsaves map[string]*store.Save
  26. subId int64
  27. }
  28. func NewTickServer() (*TickServer, error) {
  29. s := &TickServer{}
  30. s.server = msq.NewMsqServer()
  31. go s.server.Start()
  32. //交易结果通过mtf发出来,配对成功之类的消息
  33. var err error
  34. s.logclient, err = NewLogClient(s.server, 0)
  35. if err != nil {
  36. return nil, err
  37. }
  38. s.queryclient, err = NewQueryClient(s.server)
  39. if err != nil {
  40. return nil, err
  41. }
  42. //处理消息
  43. client := msq.NewMsqClient(s.server, time.Second, false, 102400)
  44. go client.Recv(nil)
  45. err = client.ConnectPrivate()
  46. if err != nil {
  47. return nil, err
  48. }
  49. s.client = client
  50. s.logAction() //源数据广播给tickserver,并送入本地保存通道,并保存为内存缓存tick数据
  51. s.ticksAction() //获取内存缓存最新tick数据(实际没有被用到)
  52. s.downloadAction() //供ts下载ds本地保存的源数据文件数据,暂未支持
  53. s.historyAction() //供ts获取ds本地保存的源数据文件信息(tick_index表)
  54. s.subAction() //ts订阅行情
  55. s.instrumentsAction() //ts获取某类型的所有symbol的信息
  56. s.instrumentAction() //ts获取某类型的某个symbol的信息
  57. s.subss = make(map[int64]*Subscribe) //订阅信息
  58. s.tickss = make(map[string]map[int64]*Market) //最新tick数据
  59. s.datasource = make(map[string]DataSource) //数据源
  60. s.logsaves = make(map[string]*store.Save) //源数据保存对象
  61. return s, nil
  62. }
  63. func (s *TickServer) AddSaveWriter(typ string, bDownload bool) {
  64. ms := &MarketSave{}
  65. //log.Println("AddSaveWriter B")
  66. logsave, err := store.NewSaveWriter(serverconf.DsMap[typ].SaveDir, typ, bDownload, ms, db)
  67. if err != nil {
  68. //log.Println("debugggggggg22222222", err)
  69. return
  70. }
  71. //log.Println("AddSaveWriter E")
  72. s.logsaves[typ] = logsave
  73. }
  74. func (s *TickServer) AddDataSource(ds DataSource, bDownload bool) {
  75. _, ok := s.datasource[ds.Name()]
  76. if !ok {
  77. s.datasource[ds.Name()] = ds
  78. s.AddSaveWriter(ds.Name(), bDownload)
  79. mk := ds.GetMarket()
  80. go func() {
  81. for mkdata := range mk {
  82. //log.Println("send fuck")
  83. s.client.SendMessage(msq.MsgTK, mkdata) //保存到ds本地文件,广播给ts,缓存到ds内存
  84. }
  85. }()
  86. }
  87. }
  88. func (s *TickServer) GetMsq() *msq.MsqServer {
  89. return s.server
  90. }
  91. func (s *TickServer) GetClient() *msq.MsqClient {
  92. return s.client
  93. }
  94. func (s *TickServer) SetLogId(logId int64) {
  95. s.logclient.SetLogId(logId)
  96. }
  97. func (s *TickServer) GetInstruments(typ string) []Instrument {
  98. ds, ok := s.datasource[typ]
  99. if ok {
  100. return ds.GetInstrument()
  101. }
  102. return nil
  103. }
  104. func (s *TickServer) GetInstrument(typ string, id int64) (Instrument, error) {
  105. ds, ok := s.datasource[typ]
  106. if ok {
  107. inss := ds.GetInstrument()
  108. for _, v := range inss {
  109. if v.Id == id {
  110. return v, nil
  111. }
  112. }
  113. }
  114. var ins Instrument
  115. return ins, errors.New("no ins")
  116. }
  117. func (s *TickServer) logAction() {
  118. s.server.RegisterAction(msq.MsgTK, func(mtf *msq.MsqServer, msg *msq.Message) {
  119. //send to chan
  120. if msg.Type == msq.MsgTK {
  121. mk := msg.GetData().(*Market)
  122. //log.Println("log fuck")
  123. for key, sub := range s.subss {
  124. err := sub.Send(mk)
  125. if err == ErrChanClosed {
  126. delete(s.subss, key)
  127. }
  128. if err == ErrChanBusy {
  129. sub.Clear()
  130. sub.Send(mk)
  131. }
  132. }
  133. //set ticks
  134. ty := DataTypeName(int(mk.Type))
  135. insId := mk.InsId
  136. if data, ok := s.tickss[ty]; ok {
  137. data[insId] = mk
  138. } else {
  139. s.tickss[ty] = make(map[int64]*Market)
  140. s.tickss[ty][insId] = mk
  141. }
  142. }
  143. s.logclient.Log(msg)
  144. })
  145. }
  146. func unixNow() int32 {
  147. sec := time.Now().Unix()
  148. return int32(sec)
  149. }
  150. func nowTime() int64 {
  151. return time.Now().UnixNano()
  152. }
  153. func (s *TickServer) ticksAction() {
  154. s.server.RegisterAction(msq.MsgTKGetTicks, func(mtf *msq.MsqServer, msg *msq.Message) {
  155. req := msg.GetData().(*TypeRequest)
  156. data, ok := s.tickss[req.Type]
  157. if !ok {
  158. msg.SetErr(errors.New("ticks::req.Type no data"))
  159. return
  160. }
  161. var ticks []*Market
  162. for _, tick := range data {
  163. ticks = append(ticks, tick)
  164. }
  165. msg.SetData(ticks)
  166. })
  167. }
  168. func (s *TickServer) instrumentsAction() {
  169. s.server.RegisterAction(msq.MsgInss, func(mtf *msq.MsqServer, msg *msq.Message) {
  170. req := msg.GetData().(*InstrumentsRequest)
  171. inss := s.GetInstruments(req.Type)
  172. if inss == nil {
  173. msg.SetErr(errors.New("instruments::req.Type no data"))
  174. return
  175. }
  176. msg.SetData(inss)
  177. })
  178. }
  179. func (s *TickServer) instrumentAction() {
  180. s.server.RegisterAction(msq.MsgIns, func(mtf *msq.MsqServer, msg *msq.Message) {
  181. req := msg.GetData().(*InstrumentRequest)
  182. ins, err := s.GetInstrument(req.Type, req.Id)
  183. if err != nil {
  184. msg.SetErr(errors.New("instrument::req.Type no data"))
  185. return
  186. }
  187. msg.SetData(ins)
  188. })
  189. }
  190. func (s *TickServer) downloadAction() {
  191. s.server.RegisterAction(msq.MsgTKDown, func(mtf *msq.MsqServer, msg *msq.Message) {
  192. s.queryclient.Query(msg)
  193. })
  194. }
  195. func (s *TickServer) historyAction() {
  196. s.server.RegisterAction(msq.MsgTKHis, func(mtf *msq.MsqServer, msg *msq.Message) {
  197. s.queryclient.Query(msg)
  198. })
  199. }
  200. func (s *TickServer) subAction() {
  201. s.server.RegisterAction(msq.MsgTKSub, func(mtf *msq.MsqServer, msg *msq.Message) {
  202. req := msg.GetData().(*StreamRequest)
  203. sub, err := s.Subscibe(req.Type)
  204. msg.SetData(sub)
  205. msg.SetErr(err)
  206. })
  207. }
  208. func (s *TickServer) Start() {
  209. s.server.Start()
  210. }
  211. func (s *TickServer) Close() error {
  212. return s.server.Close()
  213. }
  214. func (s *TickServer) Subscibe(ty string) (*Subscribe, error) {
  215. s.subId++
  216. sub := &Subscribe{}
  217. sub.server = s
  218. sub.ty = ty
  219. sub.tyId = int32(TypeId(ty))
  220. sub.id = s.subId
  221. sub.chmk = make(chan *Market, 1024)
  222. s.subss[sub.id] = sub
  223. return sub, nil
  224. }
  225. type Subscribe struct {
  226. server *TickServer
  227. chmk chan *Market
  228. ty string
  229. id int64
  230. tyId int32
  231. isclosed bool
  232. mu sync.Mutex
  233. }
  234. func (sub *Subscribe) Close() {
  235. sub.mu.Lock()
  236. defer sub.mu.Unlock()
  237. sub.isclosed = true
  238. }
  239. func (sub *Subscribe) Clear() {
  240. for {
  241. select {
  242. case <-sub.chmk:
  243. //if mk.Type == IntTdx && mk.InsId == 1 {
  244. //log.Println("[subscribe]data trace", mk.Type, mk.InsId)
  245. //}
  246. default:
  247. return
  248. }
  249. }
  250. }
  251. func (sub *Subscribe) IsClosed() bool {
  252. sub.mu.Lock()
  253. defer sub.mu.Unlock()
  254. return sub.isclosed
  255. }
  256. func (sub *Subscribe) Send(mk *Market) error {
  257. if sub.IsClosed() {
  258. return ErrChanClosed
  259. }
  260. if mk.Type != sub.tyId {
  261. return nil
  262. }
  263. //尝试发送
  264. select {
  265. case sub.chmk <- mk:
  266. default:
  267. return ErrChanBusy
  268. }
  269. return nil
  270. }