// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package tick // 本文件实现大智慧数据源接口, 实时数据和历史数据的获取和保存 import ( "encoding/binary" "errors" "io" "log" "net" "strconv" "strings" "time" "tickserver/server/market" "golang.org/x/text/encoding/simplifiedchinese" ) // DzhDS实现了dataSource接口, 并对dzh的历史数据和实时数据保存 type DzhDS struct { *DSBase conf *DsConf //insMap map[string]*market.Instrument tcp net.Conn } func init() { drivers[Dzh] = newDzhDS } func newDzhDS(conf *DsConf) (DataSource, error) { tcp, err := net.DialTimeout("tcp", conf.Url, 2*time.Second) if err != nil { return nil, err } return &DzhDS{ DSBase: NewDsBase(conf), conf: conf, tcp: tcp, //insMap: make(map[string]*market.Instrument), }, nil } func (dds *DzhDS) Name() string { return Dzh } //func (dds *DzhDS) SubIns() *event.Event { //return dds.insPublisher.Event() //} func (dds *DzhDS) Run() { log.Println("DzhDS.Run") //go dds.RunSave(64) for { err := dds.read(dds.tcp) if err != nil { log.Println(err) dds.tcp, err = net.Dial("tcp", dds.conf.Url) if err != nil { log.Fatal(err) } continue } } } func (dds *DzhDS) read(r io.Reader) error { var t int32 err := binary.Read(r, binary.LittleEndian, &t) if err != nil { return errors.New("read data TYPE error:" + err.Error()) } var count, size uint32 err = binary.Read(r, binary.LittleEndian, &count) if err != nil { return errors.New("read data COUNT error:" + err.Error()) } err = binary.Read(r, binary.LittleEndian, &size) if err != nil { return errors.New("read data SIZE error:" + err.Error()) } switch t { case 0: return dds.resoleL1(r, int(count)) case 1: return dds.resoleL2(r, int(count)) case 2: for i := 0; i < int(count); i++ { mbi := MarketBoardInfo{} err = binary.Read(r, binary.LittleEndian, &mbi) if err != nil { log.Println("read MarketBoardInfo error:", err) } } default: log.Fatal("can't go here") } return nil } type Ext struct { Code [16]byte Name [16]byte PrevClose uint32 TopLtd uint32 BotLtd uint32 } // 五档行情数据结构 type QuoteL1 struct { Ext Number int16 Tim32 int32 Open int32 High int32 Low int32 New int32 AllVol int32 AllAmount int32 MarketVal int32 Reserver2 int32 PricesOfBid [5]int32 VolumeOfBid [5]int32 PricesOfAsk [5]int32 VolumeOfAsk [5]int32 Zero [2]int32 } func QuoteL1ToMD(ql1 *QuoteL1, ins *Instrument, insIdStr string) *Market { mk := &Market{} mk.InsId = ins.Id mk.Type = IntDzh factor := 100.0 sid := market.RealInsId(insIdStr) if strings.HasPrefix(sid, "SH510") || strings.HasPrefix(sid, "SZ160") { factor = 1000.0 } mk.Asks = make([]PP, 10) mk.Bids = make([]PP, 10) for i := 0; i < 5; i++ { mk.Asks[i][0] = float64(ql1.PricesOfAsk[i]) / factor mk.Asks[i][1] = float64(ql1.VolumeOfAsk[i]) * 100. mk.Bids[i][0] = float64(ql1.PricesOfBid[i]) / factor mk.Bids[i][1] = float64(ql1.VolumeOfBid[i]) * 100. } mk.Close = float64(ql1.PrevClose) / factor mk.Open = float64(ql1.Open) / factor mk.LastPrice = float64(ql1.New) / factor mk.Timestamp = int64(ql1.Tim32) * 1000 oldVol := mk.AllVolume mk.AllVolume = float64(ql1.AllVol) * 100. mk.LastVolume = mk.AllVolume - oldVol if mk.LastVolume < 0 { mk.LastVolume = mk.AllVolume } mk.AllAmount = float64(ql1.AllAmount) return mk } // 十档行情数据结构 type QuoteL2 struct { Ext Market [2]byte Number int16 BuyEven float32 BuyVol float32 SellEven float32 SellVol float32 PricesOfBid [5]float32 VolumeOfBid [5]float32 PricesOfAsk [5]float32 VolumeOfAsk [5]float32 } func QuoteL2ToMD(ql2 *QuoteL2, ins *Instrument) *Market { //mk := ins.GetMk() mk := &Market{} mk.Type = IntDzh mk.InsId = ins.Id if len(mk.Asks) == 0 { mk.Asks = make([]PP, 10) } if len(mk.Bids) == 0 { mk.Bids = make([]PP, 10) } for i := 0; i < 5; i++ { mk.Asks[i+5][0] = float64(ql2.PricesOfAsk[i]) mk.Asks[i+5][1] = float64(ql2.VolumeOfAsk[i]) * 100 mk.Bids[i+5][0] = float64(ql2.PricesOfBid[i]) mk.Bids[i+5][1] = float64(ql2.VolumeOfBid[i]) * 100 } //ins.SetMk(mk) // log.Printf("@@@@@@@:%+v\n", mk) return mk } type MarketBoardInfo struct { Code [16]byte Info [48]byte } func (dds *DzhDS) resole(r io.Reader, count int, isL1 bool) ([]*QuoteL1, []*QuoteL2, error) { ql1s := make([]*QuoteL1, count) ql2s := make([]*QuoteL2, count) for i := 0; i < int(count); i++ { if isL1 { ql1 := &QuoteL1{} err := binary.Read(r, binary.LittleEndian, ql1) if err != nil { return nil, nil, errors.New("read QuoteL1 error:" + err.Error()) } ql1s[i] = ql1 } else { ql2 := &QuoteL2{} err := binary.Read(r, binary.LittleEndian, ql2) if err != nil { return nil, nil, errors.New("read QuoteL2 error:" + err.Error()) } ql2s[i] = ql2 } } return ql1s, ql2s, nil } func (dds *DzhDS) addIns(ext Ext) (*Instrument, string) { sid := b2s(ext.Code[:]) exid := market.SHEX if strings.Contains(sid, market.SZEX) { exid = market.SZEX } sname := b2s(ext.Name[:]) trans := simplifiedchinese.GBK.NewDecoder() dst := make([]byte, 1024) insIdStr := market.DzhPrefix + sid if len(sid) != 8 { log.Println("wrong sid", sid) return nil, "" } insId, _ := strconv.ParseInt(sid[2:], 10, 64) ins, ok := dds.insMap[insId] if !ok { nDst, _, err := trans.Transform(dst, []byte(sname), true) if err == nil { sname = string(dst[0:nDst]) } ins = &Instrument{ Id: insId, Name: sname, Type: market.Securities, ExId: exid, PriceInc: 0.01, } dds.insMap[insId] = ins //log.Println(ins) //dds.insPublisher.Publish(ins) } return ins, insIdStr } func (dds *DzhDS) resoleL1(r io.Reader, count int) error { ql1s, _, err := dds.resole(r, count, true) if err != nil { log.Println(err) return err } for _, q := range ql1s { ins, insIdStr := dds.addIns(q.Ext) if ins == nil { //log.Fatal("@@@: DzhDS.resoleL1 ins == nil") continue } mk := QuoteL1ToMD(q, ins, insIdStr) //ins.SetMk(mk) if mk.LastVolume == 0 { continue // 交易量为0, 不存储 } dds.Save(mk) } return nil } func b2s(b []byte) string { for i, c := range b { if c == 0 { return string(b[:i]) } } return string(b[:]) } func (dds *DzhDS) resoleL2(r io.Reader, count int) error { _, ql2s, err := dds.resole(r, count, false) if err != nil { log.Println(err) return err } for _, q := range ql2s { ins, _ := dds.addIns(q.Ext) if ins == nil { continue } mk := QuoteL2ToMD(q, ins) dds.Save(mk) } return nil }