ds_ctp.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. // +build linux windows,386
  2. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  3. package tick
  4. // 本文件实现ctp(期货)数据源接口, 实时数据和历史数据的获取和保存
  5. import (
  6. "errors"
  7. "log"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "tickserver/api/gocctp"
  14. "tickserver/server/market"
  15. "golang.org/x/text/encoding/simplifiedchinese"
  16. )
  17. type myMdSpi struct {
  18. gocctp.SpiBase
  19. ds *CtpDS
  20. }
  21. func (spi *myMdSpi) OnRspUserLogin(errMsg string) {
  22. spi.ds.OnMdRspUserLogin(errMsg)
  23. }
  24. func (spi *myMdSpi) OnRtnDepthMarketData(field *gocctp.CThostFtdcDepthMarketDataField) {
  25. spi.ds.OnRtnDepthMarketData(field)
  26. }
  27. // CtpDS实现了dataSource接口, 并对ctp的历史数据和实时数据保存
  28. type CtpDS struct {
  29. gocctp.SpiBase
  30. mdApi *gocctp.MdApi
  31. tdApi *gocctp.TdApi
  32. conf *DsConf // 配置
  33. *DSBase
  34. mu sync.Mutex
  35. //insMap map[string]*market.Instrument // lmax产品列表
  36. insMappingMap map[string]int64
  37. insMappings map[int64]string
  38. mdLogined bool
  39. dmkdfCh chan *gocctp.CThostFtdcDepthMarketDataField
  40. }
  41. func init() {
  42. drivers[Ctp] = newCtpDS
  43. }
  44. func newCtpDS(conf *DsConf) (DataSource, error) {
  45. log.Println("newCtpDS")
  46. mdspi := &myMdSpi{}
  47. mdDir := conf.SaveDir + "/MD/"
  48. os.MkdirAll(mdDir, 0777)
  49. mdApi := gocctp.NewMdApi(mdspi, mdDir)
  50. cds := &CtpDS{
  51. DSBase: NewDsBase(conf),
  52. mdApi: mdApi,
  53. conf: conf,
  54. //insMap: make(map[string]*market.Instrument),
  55. insMappingMap: make(map[string]int64),
  56. insMappings: make(map[int64]string),
  57. dmkdfCh: make(chan *gocctp.CThostFtdcDepthMarketDataField, 1),
  58. }
  59. tdDir := conf.SaveDir + "/TD/"
  60. os.MkdirAll(tdDir, 0777)
  61. tdApi := gocctp.NewTdApi(cds, tdDir)
  62. cds.tdApi = tdApi
  63. mdspi.ds = cds
  64. ctpTypMap = make(map[string]int)
  65. for k, v := range ctpTyps {
  66. ctpTypMap[v] = k
  67. }
  68. return cds, nil
  69. }
  70. //func (cds *CtpDS) SubIns() *event.Event {
  71. //return cds.insPublisher.Event()
  72. //}
  73. func (cds *CtpDS) onMarketData(pDepthMarketData *gocctp.CThostFtdcDepthMarketDataField) {
  74. insId := getInsId(pDepthMarketData.InstrumentID.String())
  75. intInsId := cds.insIdMapping(insId)
  76. cds.mu.Lock()
  77. ins, ok := cds.insMap[intInsId]
  78. cds.mu.Unlock()
  79. if !ok {
  80. log.Println("insId NOT in cds:", insId)
  81. return
  82. }
  83. t, its := cds.convTime(pDepthMarketData.TradingDay.String(), pDepthMarketData.UpdateTime.String())
  84. h := t.Hour()
  85. if h > 20 || h < 6 { // 夜盘
  86. w := t.Weekday()
  87. if w == time.Monday { // 交易日是周一
  88. if h > 20 {
  89. its -= 3600 * 24 * 3 // 时间应该是周五晚上
  90. } else { // h < 6
  91. its -= 3600 * 24 * 2 // 时间应该是周六凌晨
  92. }
  93. } else if h > 20 {
  94. its -= 3600 * 24 // 时间减去24小时
  95. }
  96. }
  97. ts := its*1000 + int64(pDepthMarketData.UpdateMillisec)
  98. if !checkTime(ins.ExId, ts) {
  99. //log.Println("checkTime false:", insId, ins.ExId, market.GetTime(ts))
  100. return
  101. }
  102. bids := make([]PP, 5)
  103. bids[0][0] = float64(pDepthMarketData.BidPrice1)
  104. bids[1][0] = float64(pDepthMarketData.BidPrice2)
  105. bids[2][0] = float64(pDepthMarketData.BidPrice3)
  106. bids[3][0] = float64(pDepthMarketData.BidPrice4)
  107. bids[4][0] = float64(pDepthMarketData.BidPrice5)
  108. bids[0][1] = float64(pDepthMarketData.BidVolume1)
  109. bids[1][1] = float64(pDepthMarketData.BidVolume2)
  110. bids[2][1] = float64(pDepthMarketData.BidVolume3)
  111. bids[3][1] = float64(pDepthMarketData.BidVolume4)
  112. bids[4][1] = float64(pDepthMarketData.BidVolume5)
  113. asks := make([]PP, 5)
  114. asks[0][0] = float64(pDepthMarketData.AskPrice1)
  115. asks[1][0] = float64(pDepthMarketData.AskPrice2)
  116. asks[2][0] = float64(pDepthMarketData.AskPrice3)
  117. asks[3][0] = float64(pDepthMarketData.AskPrice4)
  118. asks[4][0] = float64(pDepthMarketData.AskPrice5)
  119. asks[0][1] = float64(pDepthMarketData.AskVolume1)
  120. asks[1][1] = float64(pDepthMarketData.AskVolume2)
  121. asks[2][1] = float64(pDepthMarketData.AskVolume3)
  122. asks[3][1] = float64(pDepthMarketData.AskVolume4)
  123. asks[4][1] = float64(pDepthMarketData.AskVolume5)
  124. //mk := ins.GetMk()
  125. mk := &Market{}
  126. mk.Type = IntCtp
  127. mk.Timestamp = ts
  128. mk.LastPrice = float64(pDepthMarketData.LastPrice)
  129. mk.Bids = bids
  130. mk.Asks = asks
  131. mk.High = float64(pDepthMarketData.HighestPrice)
  132. mk.Open = float64(pDepthMarketData.OpenPrice)
  133. mk.Low = float64(pDepthMarketData.LowestPrice)
  134. mk.Close = float64(pDepthMarketData.PreClosePrice)
  135. mk.AllAmount = float64(pDepthMarketData.Turnover)
  136. // 计算交易量 从总交易量AllVolume - 上一次的 == 本次交易量
  137. oldVol := mk.AllVolume
  138. mk.AllVolume = float64(pDepthMarketData.Volume)
  139. if oldVol == 0 {
  140. oldVol = mk.AllVolume
  141. }
  142. mk.LastVolume = mk.AllVolume - oldVol
  143. if mk.LastVolume < 0 {
  144. mk.LastVolume = mk.AllVolume
  145. }
  146. mk.InsId = intInsId
  147. if intInsId == 0 {
  148. log.Println("error insid", insId)
  149. }
  150. //ins.SetMk(mk)
  151. //if pDepthMarketData.InstrumentID.String() == "IC1507" ||
  152. //pDepthMarketData.InstrumentID.String() == "IF1507" ||
  153. //pDepthMarketData.InstrumentID.String() == "IH1507" {
  154. //tt := time.Unix(mk.Timestamp/1000, (mk.Timestamp%1000)*(1e6))
  155. //log.Println("debug checktime", tt, ins.ExId, pDepthMarketData.InstrumentID.String())
  156. //}
  157. cds.Save(mk)
  158. }
  159. func (cds *CtpDS) convTime(sd, st string) (time.Time, int64) {
  160. t, n := cds.convDate(sd)
  161. if n == -1 {
  162. t = time.Now()
  163. }
  164. ss := strings.Split(st, ":")
  165. h, _ := strconv.Atoi(ss[0])
  166. m, _ := strconv.Atoi(ss[1])
  167. s, _ := strconv.Atoi(ss[2])
  168. t = time.Date(t.Year(), t.Month(), t.Day(), h, m, s, 0, time.Local)
  169. return t, t.Unix()
  170. }
  171. func (cds *CtpDS) convDate(st string) (time.Time, int64) {
  172. if len(st) < 6 {
  173. return time.Time{}, -1
  174. }
  175. sy := st[:4]
  176. sm := st[4:6]
  177. sd := st[6:]
  178. y, _ := strconv.ParseInt(string(sy), 10, 64)
  179. m, _ := strconv.ParseInt(string(sm), 10, 64)
  180. d, _ := strconv.ParseInt(string(sd), 10, 64)
  181. t := time.Date(int(y), time.Month(m), int(d), 0, 0, 0, 0, time.Local)
  182. return t, t.Unix() * 1000
  183. }
  184. // 大商所
  185. var kindsOfDS = []string{"a", "b", "bb", "c", "fb", "i", "j", "jd", "jm", "l", "m", "p", "pp", "v", "y"}
  186. // 上期所
  187. var kindsOfSQ = []string{"ag", "al", "au", "bu", "cu", "fu", "hc", "pb", "rb", "ru", "wr", "zn"}
  188. // 郑商所
  189. var kindsOfZS = []string{"CF", "FG", "JR", "LR", "MA", "OL", "PM", "RI", "RM", "RS", "SF", "SM", "SR", "TA", "TC", "WH"}
  190. // 中金所
  191. var kindsOfZJ = []string{"TF", "IF", "IH", "IC"}
  192. var pids = []string{}
  193. func init() {
  194. pids = append(pids, kindsOfDS...)
  195. pids = append(pids, kindsOfSQ...)
  196. pids = append(pids, kindsOfZS...)
  197. pids = append(pids, kindsOfZJ...)
  198. }
  199. func convInsId(iid, pid string) string {
  200. return market.CtpPrefix + iid //pid + iid[len(iid)-2:]
  201. }
  202. func getInsId(sid string) string {
  203. return market.CtpPrefix + sid
  204. }
  205. func (cds *CtpDS) onInstrument(pInstrument *gocctp.CThostFtdcInstrumentField) {
  206. pid := pInstrument.ProductID.String()
  207. trans := simplifiedchinese.GBK.NewDecoder()
  208. dst := make([]byte, 1024)
  209. for _, x := range pids {
  210. if pid == x {
  211. sname := pInstrument.InstrumentName.String()
  212. nDst, _, err := trans.Transform(dst, []byte(sname), true)
  213. if err == nil {
  214. sname = string(dst[0:nDst])
  215. }
  216. sid := pInstrument.InstrumentID.String()
  217. _, ist := cds.convDate(pInstrument.OpenDate.String())
  218. _, iet := cds.convDate(pInstrument.ExpireDate.String())
  219. ins := &Instrument{
  220. Id: cds.insIdMapping(convInsId(sid, x)),
  221. Name: sname,
  222. ExId: pInstrument.ExchangeID.String(),
  223. Type: Futures,
  224. PriceInc: float64(pInstrument.PriceTick),
  225. Margin: float64((pInstrument.LongMarginRatio + pInstrument.ShortMarginRatio) / 2),
  226. StartTime: ist,
  227. EndTime: iet + (3600 * 24 * 1000), // 加当天时间
  228. }
  229. cds.insMap[ins.Id] = ins
  230. //bGoods := 1
  231. //if ins.ExId == CFFEX {
  232. //bGoods = 0
  233. //}
  234. //ctpHisName := fmt.Sprintf("%s:%d", sid, bGoods)
  235. //ctpHisNames = append(ctpHisNames, ctpHisName)
  236. break
  237. }
  238. }
  239. }
  240. func (cds *CtpDS) OnRtnDepthMarketData(pDepthMarketData *gocctp.CThostFtdcDepthMarketDataField) {
  241. select {
  242. case cds.dmkdfCh <- pDepthMarketData:
  243. default:
  244. }
  245. }
  246. func (cds *CtpDS) OnRspQryExchange(field *gocctp.CThostFtdcExchangeField, errMsg string, isLast bool) {
  247. if errMsg != "" {
  248. log.Println(errMsg)
  249. return
  250. }
  251. }
  252. func (cds *CtpDS) OnRspQryInstrument(field *gocctp.CThostFtdcInstrumentField, errMsg string, isLast bool) {
  253. if errMsg != "" {
  254. log.Println(errMsg)
  255. return
  256. }
  257. cds.onInstrument(field)
  258. //saveCtpHisNames()
  259. if isLast {
  260. if !cds.mdLogined {
  261. cds.mdApi.Login(cds.conf.Url2, cds.conf.BrokerId, cds.conf.User, cds.conf.PassWord)
  262. log.Println("Md login:", cds.conf.Url2)
  263. } else {
  264. cds.mdLogined = false
  265. }
  266. }
  267. }
  268. func (cds *CtpDS) OnRspUserLogin(errMsg string) {
  269. trans := simplifiedchinese.GBK.NewDecoder()
  270. dst := make([]byte, len(errMsg)*2)
  271. nDst, _, err := trans.Transform(dst, []byte(errMsg), true)
  272. if err == nil {
  273. errMsg = string(dst[0:nDst])
  274. }
  275. if errMsg != "" {
  276. log.Println(errMsg)
  277. return
  278. }
  279. log.Println("OnRspUserLogin OK")
  280. cds.tdApi.QryInstrument("")
  281. }
  282. func (cds *CtpDS) OnMdRspUserLogin(errMsg string) {
  283. if errMsg != "" {
  284. log.Println(errMsg)
  285. return
  286. }
  287. cds.mdLogined = true
  288. log.Println("OnMdRspUserLogin OK")
  289. insIds := []string{}
  290. cds.mu.Lock()
  291. for id, v := range cds.insMap {
  292. idstr, ok := cds.insMappings[id]
  293. if ok {
  294. insIds = append(insIds, market.RealInsId(idstr))
  295. } else {
  296. log.Println("error ins:", v)
  297. }
  298. }
  299. cds.mu.Unlock()
  300. //conf
  301. if cds.conf.Symbols != "" {
  302. insIds = getSymbols(insIds, cds.conf.Symbols)
  303. }
  304. cds.mdApi.SubscribeMarketData(insIds)
  305. log.Println("SubscribeMarketData", insIds)
  306. }
  307. func getSymbols(insIds []string, symbols string) []string {
  308. insmap := make(map[string]bool)
  309. for i := 0; i < len(insIds); i++ {
  310. insmap[insIds[i]] = true
  311. }
  312. symarr := strings.Split(symbols, ",")
  313. var ret []string
  314. for i := 0; i < len(symarr); i++ {
  315. item1 := strings.ToLower(symarr[i])
  316. item2 := strings.ToUpper(symarr[i])
  317. if insmap[item1] {
  318. ret = append(ret, item1)
  319. }
  320. if insmap[item2] {
  321. ret = append(ret, item2)
  322. }
  323. }
  324. return ret
  325. }
  326. func (cds *CtpDS) Name() string {
  327. return Ctp
  328. }
  329. func (cds *CtpDS) Run() {
  330. log.Println("CtpDS.Run")
  331. cds.tdApi.Login(cds.conf.Url, cds.conf.BrokerId, cds.conf.User, cds.conf.PassWord, "", "")
  332. log.Println("Td login:", cds.conf.Url)
  333. go func() {
  334. for dmkd := range cds.dmkdfCh {
  335. cds.onMarketData(dmkd)
  336. }
  337. }()
  338. //cds.RunSave(32)
  339. }
  340. /*func (cds *CtpDS) runHour() {
  341. ht := time.Tick(time.Hour)
  342. for t := range ht {
  343. cds.mu.Lock()
  344. for k, ins := range cds.insMap {
  345. if t.Hour() == 6 {
  346. mk := ins.GetMk()
  347. mk.Volume = 0
  348. cds.Save(mk)
  349. }
  350. if time.Now().Unix()*1000 > ins.EndTime {
  351. log.Println("ins expired:", ins.Name, ins.Id, market.GetTime(ins.EndTime))
  352. delete(cds.insMap, k)
  353. cds.Del(k) // 指示保存
  354. }
  355. }
  356. cds.mu.Unlock()
  357. }
  358. }*/
  359. // 上海期货交易所:
  360. //上午 09:00 -- 10:15 10:30 -- 11:30
  361. //下午 13:30 -- 14:10 14:20 -- 15:00
  362. //夜盘 21:00 -- 02:30
  363. //大连、郑州商品交易所:
  364. //上午09:00 -- 10:15 10:30 -- 11:30
  365. //下午 13:30 -- 15:00
  366. //中国金融期货交易所:(沪深300期货标准合约)
  367. //平时交易时间9:15--11:30 13:00---15:15
  368. //交割日交易时间为 9:15--11:30 13:00---15:00
  369. func (cds *CtpDS) getInsId(insId string) (int64, error) {
  370. for k, v := range insId {
  371. if v >= '0' && v <= '9' {
  372. insTyp := strings.ToUpper(insId[4:k])
  373. insSuffix := insId[k:]
  374. intTyp, ok := ctpTypMap[insTyp]
  375. if !ok {
  376. log.Println("ins type error:", insTyp, insSuffix, insId)
  377. return 0, errors.New("ins type err.")
  378. }
  379. intSuffix, err := strconv.Atoi(insSuffix)
  380. if err != nil {
  381. log.Println("ins type error:", insTyp, insSuffix, insId)
  382. return 0, errors.New("ins type err.")
  383. }
  384. var id int64
  385. id = int64(intTyp)*10000 + int64(intSuffix)
  386. return id, nil
  387. }
  388. }
  389. log.Println("ins type error:", insId)
  390. return 0, errors.New("ins type err.")
  391. }
  392. func (cds *CtpDS) insIdMapping(insId string) int64 {
  393. id, ok := cds.insMappingMap[insId]
  394. if !ok {
  395. var err error
  396. id, err = cds.getInsId(insId)
  397. if err != nil {
  398. return 0
  399. }
  400. cds.insMappingMap[insId] = id
  401. cds.insMappings[id] = insId
  402. }
  403. return id
  404. }