// +build linux windows,386 // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package tick // 本文件实现ctp(期货)数据源接口, 实时数据和历史数据的获取和保存 import ( "errors" "log" "os" "strconv" "strings" "sync" "time" "tickserver/api/gocctp" "tickserver/server/market" "golang.org/x/text/encoding/simplifiedchinese" ) type myMdSpi struct { gocctp.SpiBase ds *CtpDS } func (spi *myMdSpi) OnRspUserLogin(errMsg string) { spi.ds.OnMdRspUserLogin(errMsg) } func (spi *myMdSpi) OnRtnDepthMarketData(field *gocctp.CThostFtdcDepthMarketDataField) { spi.ds.OnRtnDepthMarketData(field) } // CtpDS实现了dataSource接口, 并对ctp的历史数据和实时数据保存 type CtpDS struct { gocctp.SpiBase mdApi *gocctp.MdApi tdApi *gocctp.TdApi conf *DsConf // 配置 *DSBase mu sync.Mutex //insMap map[string]*market.Instrument // lmax产品列表 insMappingMap map[string]int64 insMappings map[int64]string mdLogined bool dmkdfCh chan *gocctp.CThostFtdcDepthMarketDataField } func init() { drivers[Ctp] = newCtpDS } func newCtpDS(conf *DsConf) (DataSource, error) { log.Println("newCtpDS") mdspi := &myMdSpi{} mdDir := conf.SaveDir + "/MD/" os.MkdirAll(mdDir, 0777) mdApi := gocctp.NewMdApi(mdspi, mdDir) cds := &CtpDS{ DSBase: NewDsBase(conf), mdApi: mdApi, conf: conf, //insMap: make(map[string]*market.Instrument), insMappingMap: make(map[string]int64), insMappings: make(map[int64]string), dmkdfCh: make(chan *gocctp.CThostFtdcDepthMarketDataField, 1), } tdDir := conf.SaveDir + "/TD/" os.MkdirAll(tdDir, 0777) tdApi := gocctp.NewTdApi(cds, tdDir) cds.tdApi = tdApi mdspi.ds = cds ctpTypMap = make(map[string]int) for k, v := range ctpTyps { ctpTypMap[v] = k } return cds, nil } //func (cds *CtpDS) SubIns() *event.Event { //return cds.insPublisher.Event() //} func (cds *CtpDS) onMarketData(pDepthMarketData *gocctp.CThostFtdcDepthMarketDataField) { insId := getInsId(pDepthMarketData.InstrumentID.String()) intInsId := cds.insIdMapping(insId) cds.mu.Lock() ins, ok := cds.insMap[intInsId] cds.mu.Unlock() if !ok { log.Println("insId NOT in cds:", insId) return } t, its := cds.convTime(pDepthMarketData.TradingDay.String(), pDepthMarketData.UpdateTime.String()) h := t.Hour() if h > 20 || h < 6 { // 夜盘 w := t.Weekday() if w == time.Monday { // 交易日是周一 if h > 20 { its -= 3600 * 24 * 3 // 时间应该是周五晚上 } else { // h < 6 its -= 3600 * 24 * 2 // 时间应该是周六凌晨 } } else if h > 20 { its -= 3600 * 24 // 时间减去24小时 } } ts := its*1000 + int64(pDepthMarketData.UpdateMillisec) if !checkTime(ins.ExId, ts) { //log.Println("checkTime false:", insId, ins.ExId, market.GetTime(ts)) return } bids := make([]PP, 5) bids[0][0] = float64(pDepthMarketData.BidPrice1) bids[1][0] = float64(pDepthMarketData.BidPrice2) bids[2][0] = float64(pDepthMarketData.BidPrice3) bids[3][0] = float64(pDepthMarketData.BidPrice4) bids[4][0] = float64(pDepthMarketData.BidPrice5) bids[0][1] = float64(pDepthMarketData.BidVolume1) bids[1][1] = float64(pDepthMarketData.BidVolume2) bids[2][1] = float64(pDepthMarketData.BidVolume3) bids[3][1] = float64(pDepthMarketData.BidVolume4) bids[4][1] = float64(pDepthMarketData.BidVolume5) asks := make([]PP, 5) asks[0][0] = float64(pDepthMarketData.AskPrice1) asks[1][0] = float64(pDepthMarketData.AskPrice2) asks[2][0] = float64(pDepthMarketData.AskPrice3) asks[3][0] = float64(pDepthMarketData.AskPrice4) asks[4][0] = float64(pDepthMarketData.AskPrice5) asks[0][1] = float64(pDepthMarketData.AskVolume1) asks[1][1] = float64(pDepthMarketData.AskVolume2) asks[2][1] = float64(pDepthMarketData.AskVolume3) asks[3][1] = float64(pDepthMarketData.AskVolume4) asks[4][1] = float64(pDepthMarketData.AskVolume5) //mk := ins.GetMk() mk := &Market{} mk.Type = IntCtp mk.Timestamp = ts mk.LastPrice = float64(pDepthMarketData.LastPrice) mk.Bids = bids mk.Asks = asks mk.High = float64(pDepthMarketData.HighestPrice) mk.Open = float64(pDepthMarketData.OpenPrice) mk.Low = float64(pDepthMarketData.LowestPrice) mk.Close = float64(pDepthMarketData.PreClosePrice) mk.AllAmount = float64(pDepthMarketData.Turnover) // 计算交易量 从总交易量AllVolume - 上一次的 == 本次交易量 oldVol := mk.AllVolume mk.AllVolume = float64(pDepthMarketData.Volume) if oldVol == 0 { oldVol = mk.AllVolume } mk.LastVolume = mk.AllVolume - oldVol if mk.LastVolume < 0 { mk.LastVolume = mk.AllVolume } mk.InsId = intInsId if intInsId == 0 { log.Println("error insid", insId) } //ins.SetMk(mk) //if pDepthMarketData.InstrumentID.String() == "IC1507" || //pDepthMarketData.InstrumentID.String() == "IF1507" || //pDepthMarketData.InstrumentID.String() == "IH1507" { //tt := time.Unix(mk.Timestamp/1000, (mk.Timestamp%1000)*(1e6)) //log.Println("debug checktime", tt, ins.ExId, pDepthMarketData.InstrumentID.String()) //} cds.Save(mk) } func (cds *CtpDS) convTime(sd, st string) (time.Time, int64) { t, n := cds.convDate(sd) if n == -1 { t = time.Now() } ss := strings.Split(st, ":") h, _ := strconv.Atoi(ss[0]) m, _ := strconv.Atoi(ss[1]) s, _ := strconv.Atoi(ss[2]) t = time.Date(t.Year(), t.Month(), t.Day(), h, m, s, 0, time.Local) return t, t.Unix() } func (cds *CtpDS) convDate(st string) (time.Time, int64) { if len(st) < 6 { return time.Time{}, -1 } sy := st[:4] sm := st[4:6] sd := st[6:] y, _ := strconv.ParseInt(string(sy), 10, 64) m, _ := strconv.ParseInt(string(sm), 10, 64) d, _ := strconv.ParseInt(string(sd), 10, 64) t := time.Date(int(y), time.Month(m), int(d), 0, 0, 0, 0, time.Local) return t, t.Unix() * 1000 } // 大商所 var kindsOfDS = []string{"a", "b", "bb", "c", "fb", "i", "j", "jd", "jm", "l", "m", "p", "pp", "v", "y"} // 上期所 var kindsOfSQ = []string{"ag", "al", "au", "bu", "cu", "fu", "hc", "pb", "rb", "ru", "wr", "zn"} // 郑商所 var kindsOfZS = []string{"CF", "FG", "JR", "LR", "MA", "OL", "PM", "RI", "RM", "RS", "SF", "SM", "SR", "TA", "TC", "WH"} // 中金所 var kindsOfZJ = []string{"TF", "IF", "IH", "IC"} var pids = []string{} func init() { pids = append(pids, kindsOfDS...) pids = append(pids, kindsOfSQ...) pids = append(pids, kindsOfZS...) pids = append(pids, kindsOfZJ...) } func convInsId(iid, pid string) string { return market.CtpPrefix + iid //pid + iid[len(iid)-2:] } func getInsId(sid string) string { return market.CtpPrefix + sid } func (cds *CtpDS) onInstrument(pInstrument *gocctp.CThostFtdcInstrumentField) { pid := pInstrument.ProductID.String() trans := simplifiedchinese.GBK.NewDecoder() dst := make([]byte, 1024) for _, x := range pids { if pid == x { sname := pInstrument.InstrumentName.String() nDst, _, err := trans.Transform(dst, []byte(sname), true) if err == nil { sname = string(dst[0:nDst]) } sid := pInstrument.InstrumentID.String() _, ist := cds.convDate(pInstrument.OpenDate.String()) _, iet := cds.convDate(pInstrument.ExpireDate.String()) ins := &Instrument{ Id: cds.insIdMapping(convInsId(sid, x)), Name: sname, ExId: pInstrument.ExchangeID.String(), Type: Futures, PriceInc: float64(pInstrument.PriceTick), Margin: float64((pInstrument.LongMarginRatio + pInstrument.ShortMarginRatio) / 2), StartTime: ist, EndTime: iet + (3600 * 24 * 1000), // 加当天时间 } cds.insMap[ins.Id] = ins //bGoods := 1 //if ins.ExId == CFFEX { //bGoods = 0 //} //ctpHisName := fmt.Sprintf("%s:%d", sid, bGoods) //ctpHisNames = append(ctpHisNames, ctpHisName) break } } } func (cds *CtpDS) OnRtnDepthMarketData(pDepthMarketData *gocctp.CThostFtdcDepthMarketDataField) { select { case cds.dmkdfCh <- pDepthMarketData: default: } } func (cds *CtpDS) OnRspQryExchange(field *gocctp.CThostFtdcExchangeField, errMsg string, isLast bool) { if errMsg != "" { log.Println(errMsg) return } } func (cds *CtpDS) OnRspQryInstrument(field *gocctp.CThostFtdcInstrumentField, errMsg string, isLast bool) { if errMsg != "" { log.Println(errMsg) return } cds.onInstrument(field) //saveCtpHisNames() if isLast { if !cds.mdLogined { cds.mdApi.Login(cds.conf.Url2, cds.conf.BrokerId, cds.conf.User, cds.conf.PassWord) log.Println("Md login:", cds.conf.Url2) } else { cds.mdLogined = false } } } func (cds *CtpDS) OnRspUserLogin(errMsg string) { trans := simplifiedchinese.GBK.NewDecoder() dst := make([]byte, len(errMsg)*2) nDst, _, err := trans.Transform(dst, []byte(errMsg), true) if err == nil { errMsg = string(dst[0:nDst]) } if errMsg != "" { log.Println(errMsg) return } log.Println("OnRspUserLogin OK") cds.tdApi.QryInstrument("") } func (cds *CtpDS) OnMdRspUserLogin(errMsg string) { if errMsg != "" { log.Println(errMsg) return } cds.mdLogined = true log.Println("OnMdRspUserLogin OK") insIds := []string{} cds.mu.Lock() for id, v := range cds.insMap { idstr, ok := cds.insMappings[id] if ok { insIds = append(insIds, market.RealInsId(idstr)) } else { log.Println("error ins:", v) } } cds.mu.Unlock() //conf if cds.conf.Symbols != "" { insIds = getSymbols(insIds, cds.conf.Symbols) } cds.mdApi.SubscribeMarketData(insIds) log.Println("SubscribeMarketData", insIds) } func getSymbols(insIds []string, symbols string) []string { insmap := make(map[string]bool) for i := 0; i < len(insIds); i++ { insmap[insIds[i]] = true } symarr := strings.Split(symbols, ",") var ret []string for i := 0; i < len(symarr); i++ { item1 := strings.ToLower(symarr[i]) item2 := strings.ToUpper(symarr[i]) if insmap[item1] { ret = append(ret, item1) } if insmap[item2] { ret = append(ret, item2) } } return ret } func (cds *CtpDS) Name() string { return Ctp } func (cds *CtpDS) Run() { log.Println("CtpDS.Run") cds.tdApi.Login(cds.conf.Url, cds.conf.BrokerId, cds.conf.User, cds.conf.PassWord, "", "") log.Println("Td login:", cds.conf.Url) go func() { for dmkd := range cds.dmkdfCh { cds.onMarketData(dmkd) } }() //cds.RunSave(32) } /*func (cds *CtpDS) runHour() { ht := time.Tick(time.Hour) for t := range ht { cds.mu.Lock() for k, ins := range cds.insMap { if t.Hour() == 6 { mk := ins.GetMk() mk.Volume = 0 cds.Save(mk) } if time.Now().Unix()*1000 > ins.EndTime { log.Println("ins expired:", ins.Name, ins.Id, market.GetTime(ins.EndTime)) delete(cds.insMap, k) cds.Del(k) // 指示保存 } } cds.mu.Unlock() } }*/ // 上海期货交易所: //上午 09:00 -- 10:15 10:30 -- 11:30 //下午 13:30 -- 14:10 14:20 -- 15:00 //夜盘 21:00 -- 02:30 //大连、郑州商品交易所: //上午09:00 -- 10:15 10:30 -- 11:30 //下午 13:30 -- 15:00 //中国金融期货交易所:(沪深300期货标准合约) //平时交易时间9:15--11:30 13:00---15:15 //交割日交易时间为 9:15--11:30 13:00---15:00 func (cds *CtpDS) getInsId(insId string) (int64, error) { for k, v := range insId { if v >= '0' && v <= '9' { insTyp := strings.ToUpper(insId[4:k]) insSuffix := insId[k:] intTyp, ok := ctpTypMap[insTyp] if !ok { log.Println("ins type error:", insTyp, insSuffix, insId) return 0, errors.New("ins type err.") } intSuffix, err := strconv.Atoi(insSuffix) if err != nil { log.Println("ins type error:", insTyp, insSuffix, insId) return 0, errors.New("ins type err.") } var id int64 id = int64(intTyp)*10000 + int64(intSuffix) return id, nil } } log.Println("ins type error:", insId) return 0, errors.New("ins type err.") } func (cds *CtpDS) insIdMapping(insId string) int64 { id, ok := cds.insMappingMap[insId] if !ok { var err error id, err = cds.getInsId(insId) if err != nil { return 0 } cds.insMappingMap[insId] = id cds.insMappings[id] = insId } return id }