123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455 |
- // +build linux windows,386
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package tick
- // 本文件实现ctp(期货)数据源接口, 实时数据和历史数据的获取和保存
- import (
- "errors"
- "log"
- "os"
- "strconv"
- "strings"
- "sync"
- "time"
- "tickserver/api/gocctp"
- "tickserver/server/market"
- "golang.org/x/text/encoding/simplifiedchinese"
- )
- type myMdSpi struct {
- gocctp.SpiBase
- ds *CtpDS
- }
- func (spi *myMdSpi) OnRspUserLogin(errMsg string) {
- spi.ds.OnMdRspUserLogin(errMsg)
- }
- func (spi *myMdSpi) OnRtnDepthMarketData(field *gocctp.CThostFtdcDepthMarketDataField) {
- spi.ds.OnRtnDepthMarketData(field)
- }
- // CtpDS实现了dataSource接口, 并对ctp的历史数据和实时数据保存
- type CtpDS struct {
- gocctp.SpiBase
- mdApi *gocctp.MdApi
- tdApi *gocctp.TdApi
- conf *DsConf // 配置
- *DSBase
- mu sync.Mutex
- //insMap map[string]*market.Instrument // lmax产品列表
- insMappingMap map[string]int64
- insMappings map[int64]string
- mdLogined bool
- dmkdfCh chan *gocctp.CThostFtdcDepthMarketDataField
- }
- func init() {
- drivers[Ctp] = newCtpDS
- }
- func newCtpDS(conf *DsConf) (DataSource, error) {
- log.Println("newCtpDS")
- mdspi := &myMdSpi{}
- mdDir := conf.SaveDir + "/MD/"
- os.MkdirAll(mdDir, 0777)
- mdApi := gocctp.NewMdApi(mdspi, mdDir)
- cds := &CtpDS{
- DSBase: NewDsBase(conf),
- mdApi: mdApi,
- conf: conf,
- //insMap: make(map[string]*market.Instrument),
- insMappingMap: make(map[string]int64),
- insMappings: make(map[int64]string),
- dmkdfCh: make(chan *gocctp.CThostFtdcDepthMarketDataField, 1),
- }
- tdDir := conf.SaveDir + "/TD/"
- os.MkdirAll(tdDir, 0777)
- tdApi := gocctp.NewTdApi(cds, tdDir)
- cds.tdApi = tdApi
- mdspi.ds = cds
- ctpTypMap = make(map[string]int)
- for k, v := range ctpTyps {
- ctpTypMap[v] = k
- }
- return cds, nil
- }
- //func (cds *CtpDS) SubIns() *event.Event {
- //return cds.insPublisher.Event()
- //}
- func (cds *CtpDS) onMarketData(pDepthMarketData *gocctp.CThostFtdcDepthMarketDataField) {
- insId := getInsId(pDepthMarketData.InstrumentID.String())
- intInsId := cds.insIdMapping(insId)
- cds.mu.Lock()
- ins, ok := cds.insMap[intInsId]
- cds.mu.Unlock()
- if !ok {
- log.Println("insId NOT in cds:", insId)
- return
- }
- t, its := cds.convTime(pDepthMarketData.TradingDay.String(), pDepthMarketData.UpdateTime.String())
- h := t.Hour()
- if h > 20 || h < 6 { // 夜盘
- w := t.Weekday()
- if w == time.Monday { // 交易日是周一
- if h > 20 {
- its -= 3600 * 24 * 3 // 时间应该是周五晚上
- } else { // h < 6
- its -= 3600 * 24 * 2 // 时间应该是周六凌晨
- }
- } else if h > 20 {
- its -= 3600 * 24 // 时间减去24小时
- }
- }
- ts := its*1000 + int64(pDepthMarketData.UpdateMillisec)
- if !checkTime(ins.ExId, ts) {
- //log.Println("checkTime false:", insId, ins.ExId, market.GetTime(ts))
- return
- }
- bids := make([]PP, 5)
- bids[0][0] = float64(pDepthMarketData.BidPrice1)
- bids[1][0] = float64(pDepthMarketData.BidPrice2)
- bids[2][0] = float64(pDepthMarketData.BidPrice3)
- bids[3][0] = float64(pDepthMarketData.BidPrice4)
- bids[4][0] = float64(pDepthMarketData.BidPrice5)
- bids[0][1] = float64(pDepthMarketData.BidVolume1)
- bids[1][1] = float64(pDepthMarketData.BidVolume2)
- bids[2][1] = float64(pDepthMarketData.BidVolume3)
- bids[3][1] = float64(pDepthMarketData.BidVolume4)
- bids[4][1] = float64(pDepthMarketData.BidVolume5)
- asks := make([]PP, 5)
- asks[0][0] = float64(pDepthMarketData.AskPrice1)
- asks[1][0] = float64(pDepthMarketData.AskPrice2)
- asks[2][0] = float64(pDepthMarketData.AskPrice3)
- asks[3][0] = float64(pDepthMarketData.AskPrice4)
- asks[4][0] = float64(pDepthMarketData.AskPrice5)
- asks[0][1] = float64(pDepthMarketData.AskVolume1)
- asks[1][1] = float64(pDepthMarketData.AskVolume2)
- asks[2][1] = float64(pDepthMarketData.AskVolume3)
- asks[3][1] = float64(pDepthMarketData.AskVolume4)
- asks[4][1] = float64(pDepthMarketData.AskVolume5)
- //mk := ins.GetMk()
- mk := &Market{}
- mk.Type = IntCtp
- mk.Timestamp = ts
- mk.LastPrice = float64(pDepthMarketData.LastPrice)
- mk.Bids = bids
- mk.Asks = asks
- mk.High = float64(pDepthMarketData.HighestPrice)
- mk.Open = float64(pDepthMarketData.OpenPrice)
- mk.Low = float64(pDepthMarketData.LowestPrice)
- mk.Close = float64(pDepthMarketData.PreClosePrice)
- mk.AllAmount = float64(pDepthMarketData.Turnover)
- // 计算交易量 从总交易量AllVolume - 上一次的 == 本次交易量
- oldVol := mk.AllVolume
- mk.AllVolume = float64(pDepthMarketData.Volume)
- if oldVol == 0 {
- oldVol = mk.AllVolume
- }
- mk.LastVolume = mk.AllVolume - oldVol
- if mk.LastVolume < 0 {
- mk.LastVolume = mk.AllVolume
- }
- mk.InsId = intInsId
- if intInsId == 0 {
- log.Println("error insid", insId)
- }
- //ins.SetMk(mk)
- //if pDepthMarketData.InstrumentID.String() == "IC1507" ||
- //pDepthMarketData.InstrumentID.String() == "IF1507" ||
- //pDepthMarketData.InstrumentID.String() == "IH1507" {
- //tt := time.Unix(mk.Timestamp/1000, (mk.Timestamp%1000)*(1e6))
- //log.Println("debug checktime", tt, ins.ExId, pDepthMarketData.InstrumentID.String())
- //}
- cds.Save(mk)
- }
- func (cds *CtpDS) convTime(sd, st string) (time.Time, int64) {
- t, n := cds.convDate(sd)
- if n == -1 {
- t = time.Now()
- }
- ss := strings.Split(st, ":")
- h, _ := strconv.Atoi(ss[0])
- m, _ := strconv.Atoi(ss[1])
- s, _ := strconv.Atoi(ss[2])
- t = time.Date(t.Year(), t.Month(), t.Day(), h, m, s, 0, time.Local)
- return t, t.Unix()
- }
- func (cds *CtpDS) convDate(st string) (time.Time, int64) {
- if len(st) < 6 {
- return time.Time{}, -1
- }
- sy := st[:4]
- sm := st[4:6]
- sd := st[6:]
- y, _ := strconv.ParseInt(string(sy), 10, 64)
- m, _ := strconv.ParseInt(string(sm), 10, 64)
- d, _ := strconv.ParseInt(string(sd), 10, 64)
- t := time.Date(int(y), time.Month(m), int(d), 0, 0, 0, 0, time.Local)
- return t, t.Unix() * 1000
- }
- // 大商所
- var kindsOfDS = []string{"a", "b", "bb", "c", "fb", "i", "j", "jd", "jm", "l", "m", "p", "pp", "v", "y"}
- // 上期所
- var kindsOfSQ = []string{"ag", "al", "au", "bu", "cu", "fu", "hc", "pb", "rb", "ru", "wr", "zn"}
- // 郑商所
- var kindsOfZS = []string{"CF", "FG", "JR", "LR", "MA", "OL", "PM", "RI", "RM", "RS", "SF", "SM", "SR", "TA", "TC", "WH"}
- // 中金所
- var kindsOfZJ = []string{"TF", "IF", "IH", "IC"}
- var pids = []string{}
- func init() {
- pids = append(pids, kindsOfDS...)
- pids = append(pids, kindsOfSQ...)
- pids = append(pids, kindsOfZS...)
- pids = append(pids, kindsOfZJ...)
- }
- func convInsId(iid, pid string) string {
- return market.CtpPrefix + iid //pid + iid[len(iid)-2:]
- }
- func getInsId(sid string) string {
- return market.CtpPrefix + sid
- }
- func (cds *CtpDS) onInstrument(pInstrument *gocctp.CThostFtdcInstrumentField) {
- pid := pInstrument.ProductID.String()
- trans := simplifiedchinese.GBK.NewDecoder()
- dst := make([]byte, 1024)
- for _, x := range pids {
- if pid == x {
- sname := pInstrument.InstrumentName.String()
- nDst, _, err := trans.Transform(dst, []byte(sname), true)
- if err == nil {
- sname = string(dst[0:nDst])
- }
- sid := pInstrument.InstrumentID.String()
- _, ist := cds.convDate(pInstrument.OpenDate.String())
- _, iet := cds.convDate(pInstrument.ExpireDate.String())
- ins := &Instrument{
- Id: cds.insIdMapping(convInsId(sid, x)),
- Name: sname,
- ExId: pInstrument.ExchangeID.String(),
- Type: Futures,
- PriceInc: float64(pInstrument.PriceTick),
- Margin: float64((pInstrument.LongMarginRatio + pInstrument.ShortMarginRatio) / 2),
- StartTime: ist,
- EndTime: iet + (3600 * 24 * 1000), // 加当天时间
- }
- cds.insMap[ins.Id] = ins
- //bGoods := 1
- //if ins.ExId == CFFEX {
- //bGoods = 0
- //}
- //ctpHisName := fmt.Sprintf("%s:%d", sid, bGoods)
- //ctpHisNames = append(ctpHisNames, ctpHisName)
- break
- }
- }
- }
- func (cds *CtpDS) OnRtnDepthMarketData(pDepthMarketData *gocctp.CThostFtdcDepthMarketDataField) {
- select {
- case cds.dmkdfCh <- pDepthMarketData:
- default:
- }
- }
- func (cds *CtpDS) OnRspQryExchange(field *gocctp.CThostFtdcExchangeField, errMsg string, isLast bool) {
- if errMsg != "" {
- log.Println(errMsg)
- return
- }
- }
- func (cds *CtpDS) OnRspQryInstrument(field *gocctp.CThostFtdcInstrumentField, errMsg string, isLast bool) {
- if errMsg != "" {
- log.Println(errMsg)
- return
- }
- cds.onInstrument(field)
- //saveCtpHisNames()
- if isLast {
- if !cds.mdLogined {
- cds.mdApi.Login(cds.conf.Url2, cds.conf.BrokerId, cds.conf.User, cds.conf.PassWord)
- log.Println("Md login:", cds.conf.Url2)
- } else {
- cds.mdLogined = false
- }
- }
- }
- func (cds *CtpDS) OnRspUserLogin(errMsg string) {
- trans := simplifiedchinese.GBK.NewDecoder()
- dst := make([]byte, len(errMsg)*2)
- nDst, _, err := trans.Transform(dst, []byte(errMsg), true)
- if err == nil {
- errMsg = string(dst[0:nDst])
- }
- if errMsg != "" {
- log.Println(errMsg)
- return
- }
- log.Println("OnRspUserLogin OK")
- cds.tdApi.QryInstrument("")
- }
- func (cds *CtpDS) OnMdRspUserLogin(errMsg string) {
- if errMsg != "" {
- log.Println(errMsg)
- return
- }
- cds.mdLogined = true
- log.Println("OnMdRspUserLogin OK")
- insIds := []string{}
- cds.mu.Lock()
- for id, v := range cds.insMap {
- idstr, ok := cds.insMappings[id]
- if ok {
- insIds = append(insIds, market.RealInsId(idstr))
- } else {
- log.Println("error ins:", v)
- }
- }
- cds.mu.Unlock()
- //conf
- if cds.conf.Symbols != "" {
- insIds = getSymbols(insIds, cds.conf.Symbols)
- }
- cds.mdApi.SubscribeMarketData(insIds)
- log.Println("SubscribeMarketData", insIds)
- }
- func getSymbols(insIds []string, symbols string) []string {
- insmap := make(map[string]bool)
- for i := 0; i < len(insIds); i++ {
- insmap[insIds[i]] = true
- }
- symarr := strings.Split(symbols, ",")
- var ret []string
- for i := 0; i < len(symarr); i++ {
- item1 := strings.ToLower(symarr[i])
- item2 := strings.ToUpper(symarr[i])
- if insmap[item1] {
- ret = append(ret, item1)
- }
- if insmap[item2] {
- ret = append(ret, item2)
- }
- }
- return ret
- }
- func (cds *CtpDS) Name() string {
- return Ctp
- }
- func (cds *CtpDS) Run() {
- log.Println("CtpDS.Run")
- cds.tdApi.Login(cds.conf.Url, cds.conf.BrokerId, cds.conf.User, cds.conf.PassWord, "", "")
- log.Println("Td login:", cds.conf.Url)
- go func() {
- for dmkd := range cds.dmkdfCh {
- cds.onMarketData(dmkd)
- }
- }()
- //cds.RunSave(32)
- }
- /*func (cds *CtpDS) runHour() {
- ht := time.Tick(time.Hour)
- for t := range ht {
- cds.mu.Lock()
- for k, ins := range cds.insMap {
- if t.Hour() == 6 {
- mk := ins.GetMk()
- mk.Volume = 0
- cds.Save(mk)
- }
- if time.Now().Unix()*1000 > ins.EndTime {
- log.Println("ins expired:", ins.Name, ins.Id, market.GetTime(ins.EndTime))
- delete(cds.insMap, k)
- cds.Del(k) // 指示保存
- }
- }
- cds.mu.Unlock()
- }
- }*/
- // 上海期货交易所:
- //上午 09:00 -- 10:15 10:30 -- 11:30
- //下午 13:30 -- 14:10 14:20 -- 15:00
- //夜盘 21:00 -- 02:30
- //大连、郑州商品交易所:
- //上午09:00 -- 10:15 10:30 -- 11:30
- //下午 13:30 -- 15:00
- //中国金融期货交易所:(沪深300期货标准合约)
- //平时交易时间9:15--11:30 13:00---15:15
- //交割日交易时间为 9:15--11:30 13:00---15:00
- func (cds *CtpDS) getInsId(insId string) (int64, error) {
- for k, v := range insId {
- if v >= '0' && v <= '9' {
- insTyp := strings.ToUpper(insId[4:k])
- insSuffix := insId[k:]
- intTyp, ok := ctpTypMap[insTyp]
- if !ok {
- log.Println("ins type error:", insTyp, insSuffix, insId)
- return 0, errors.New("ins type err.")
- }
- intSuffix, err := strconv.Atoi(insSuffix)
- if err != nil {
- log.Println("ins type error:", insTyp, insSuffix, insId)
- return 0, errors.New("ins type err.")
- }
- var id int64
- id = int64(intTyp)*10000 + int64(intSuffix)
- return id, nil
- }
- }
- log.Println("ins type error:", insId)
- return 0, errors.New("ins type err.")
- }
- func (cds *CtpDS) insIdMapping(insId string) int64 {
- id, ok := cds.insMappingMap[insId]
- if !ok {
- var err error
- id, err = cds.getInsId(insId)
- if err != nil {
- return 0
- }
- cds.insMappingMap[insId] = id
- cds.insMappings[id] = insId
- }
- return id
- }
|