123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package tick
- // 本文件实现大智慧数据源接口, 实时数据和历史数据的获取和保存
- import (
- "encoding/binary"
- "errors"
- "io"
- "log"
- "net"
- "strconv"
- "strings"
- "time"
- "tickserver/server/market"
- "golang.org/x/text/encoding/simplifiedchinese"
- )
- // DzhDS实现了dataSource接口, 并对dzh的历史数据和实时数据保存
- type DzhDS struct {
- *DSBase
- conf *DsConf
- //insMap map[string]*market.Instrument
- tcp net.Conn
- }
- func init() {
- drivers[Dzh] = newDzhDS
- }
- func newDzhDS(conf *DsConf) (DataSource, error) {
- tcp, err := net.DialTimeout("tcp", conf.Url, 2*time.Second)
- if err != nil {
- return nil, err
- }
- return &DzhDS{
- DSBase: NewDsBase(conf),
- conf: conf,
- tcp: tcp,
- //insMap: make(map[string]*market.Instrument),
- }, nil
- }
- func (dds *DzhDS) Name() string {
- return Dzh
- }
- //func (dds *DzhDS) SubIns() *event.Event {
- //return dds.insPublisher.Event()
- //}
- func (dds *DzhDS) Run() {
- log.Println("DzhDS.Run")
- //go dds.RunSave(64)
- for {
- err := dds.read(dds.tcp)
- if err != nil {
- log.Println(err)
- dds.tcp, err = net.Dial("tcp", dds.conf.Url)
- if err != nil {
- log.Fatal(err)
- }
- continue
- }
- }
- }
- func (dds *DzhDS) read(r io.Reader) error {
- var t int32
- err := binary.Read(r, binary.LittleEndian, &t)
- if err != nil {
- return errors.New("read data TYPE error:" + err.Error())
- }
- var count, size uint32
- err = binary.Read(r, binary.LittleEndian, &count)
- if err != nil {
- return errors.New("read data COUNT error:" + err.Error())
- }
- err = binary.Read(r, binary.LittleEndian, &size)
- if err != nil {
- return errors.New("read data SIZE error:" + err.Error())
- }
- switch t {
- case 0:
- return dds.resoleL1(r, int(count))
- case 1:
- return dds.resoleL2(r, int(count))
- case 2:
- for i := 0; i < int(count); i++ {
- mbi := MarketBoardInfo{}
- err = binary.Read(r, binary.LittleEndian, &mbi)
- if err != nil {
- log.Println("read MarketBoardInfo error:", err)
- }
- }
- default:
- log.Fatal("can't go here")
- }
- return nil
- }
- type Ext struct {
- Code [16]byte
- Name [16]byte
- PrevClose uint32
- TopLtd uint32
- BotLtd uint32
- }
- // 五档行情数据结构
- type QuoteL1 struct {
- Ext
- Number int16
- Tim32 int32
- Open int32
- High int32
- Low int32
- New int32
- AllVol int32
- AllAmount int32
- MarketVal int32
- Reserver2 int32
- PricesOfBid [5]int32
- VolumeOfBid [5]int32
- PricesOfAsk [5]int32
- VolumeOfAsk [5]int32
- Zero [2]int32
- }
- func QuoteL1ToMD(ql1 *QuoteL1, ins *Instrument, insIdStr string) *Market {
- mk := &Market{}
- mk.InsId = ins.Id
- mk.Type = IntDzh
- factor := 100.0
- sid := market.RealInsId(insIdStr)
- if strings.HasPrefix(sid, "SH510") || strings.HasPrefix(sid, "SZ160") {
- factor = 1000.0
- }
- mk.Asks = make([]PP, 10)
- mk.Bids = make([]PP, 10)
- for i := 0; i < 5; i++ {
- mk.Asks[i][0] = float64(ql1.PricesOfAsk[i]) / factor
- mk.Asks[i][1] = float64(ql1.VolumeOfAsk[i]) * 100.
- mk.Bids[i][0] = float64(ql1.PricesOfBid[i]) / factor
- mk.Bids[i][1] = float64(ql1.VolumeOfBid[i]) * 100.
- }
- mk.Close = float64(ql1.PrevClose) / factor
- mk.Open = float64(ql1.Open) / factor
- mk.LastPrice = float64(ql1.New) / factor
- mk.Timestamp = int64(ql1.Tim32) * 1000
- oldVol := mk.AllVolume
- mk.AllVolume = float64(ql1.AllVol) * 100.
- mk.LastVolume = mk.AllVolume - oldVol
- if mk.LastVolume < 0 {
- mk.LastVolume = mk.AllVolume
- }
- mk.AllAmount = float64(ql1.AllAmount)
- return mk
- }
- // 十档行情数据结构
- type QuoteL2 struct {
- Ext
- Market [2]byte
- Number int16
- BuyEven float32
- BuyVol float32
- SellEven float32
- SellVol float32
- PricesOfBid [5]float32
- VolumeOfBid [5]float32
- PricesOfAsk [5]float32
- VolumeOfAsk [5]float32
- }
- func QuoteL2ToMD(ql2 *QuoteL2, ins *Instrument) *Market {
- //mk := ins.GetMk()
- mk := &Market{}
- mk.Type = IntDzh
- mk.InsId = ins.Id
- if len(mk.Asks) == 0 {
- mk.Asks = make([]PP, 10)
- }
- if len(mk.Bids) == 0 {
- mk.Bids = make([]PP, 10)
- }
- for i := 0; i < 5; i++ {
- mk.Asks[i+5][0] = float64(ql2.PricesOfAsk[i])
- mk.Asks[i+5][1] = float64(ql2.VolumeOfAsk[i]) * 100
- mk.Bids[i+5][0] = float64(ql2.PricesOfBid[i])
- mk.Bids[i+5][1] = float64(ql2.VolumeOfBid[i]) * 100
- }
- //ins.SetMk(mk)
- // log.Printf("@@@@@@@:%+v\n", mk)
- return mk
- }
- type MarketBoardInfo struct {
- Code [16]byte
- Info [48]byte
- }
- func (dds *DzhDS) resole(r io.Reader, count int, isL1 bool) ([]*QuoteL1, []*QuoteL2, error) {
- ql1s := make([]*QuoteL1, count)
- ql2s := make([]*QuoteL2, count)
- for i := 0; i < int(count); i++ {
- if isL1 {
- ql1 := &QuoteL1{}
- err := binary.Read(r, binary.LittleEndian, ql1)
- if err != nil {
- return nil, nil, errors.New("read QuoteL1 error:" + err.Error())
- }
- ql1s[i] = ql1
- } else {
- ql2 := &QuoteL2{}
- err := binary.Read(r, binary.LittleEndian, ql2)
- if err != nil {
- return nil, nil, errors.New("read QuoteL2 error:" + err.Error())
- }
- ql2s[i] = ql2
- }
- }
- return ql1s, ql2s, nil
- }
- func (dds *DzhDS) addIns(ext Ext) (*Instrument, string) {
- sid := b2s(ext.Code[:])
- exid := market.SHEX
- if strings.Contains(sid, market.SZEX) {
- exid = market.SZEX
- }
- sname := b2s(ext.Name[:])
- trans := simplifiedchinese.GBK.NewDecoder()
- dst := make([]byte, 1024)
- insIdStr := market.DzhPrefix + sid
- if len(sid) != 8 {
- log.Println("wrong sid", sid)
- return nil, ""
- }
- insId, _ := strconv.ParseInt(sid[2:], 10, 64)
- ins, ok := dds.insMap[insId]
- if !ok {
- nDst, _, err := trans.Transform(dst, []byte(sname), true)
- if err == nil {
- sname = string(dst[0:nDst])
- }
- ins = &Instrument{
- Id: insId,
- Name: sname,
- Type: market.Securities,
- ExId: exid,
- PriceInc: 0.01,
- }
- dds.insMap[insId] = ins
- //log.Println(ins)
- //dds.insPublisher.Publish(ins)
- }
- return ins, insIdStr
- }
- func (dds *DzhDS) resoleL1(r io.Reader, count int) error {
- ql1s, _, err := dds.resole(r, count, true)
- if err != nil {
- log.Println(err)
- return err
- }
- for _, q := range ql1s {
- ins, insIdStr := dds.addIns(q.Ext)
- if ins == nil {
- //log.Fatal("@@@: DzhDS.resoleL1 ins == nil")
- continue
- }
- mk := QuoteL1ToMD(q, ins, insIdStr)
- //ins.SetMk(mk)
- if mk.LastVolume == 0 {
- continue // 交易量为0, 不存储
- }
- dds.Save(mk)
- }
- return nil
- }
- func b2s(b []byte) string {
- for i, c := range b {
- if c == 0 {
- return string(b[:i])
- }
- }
- return string(b[:])
- }
- func (dds *DzhDS) resoleL2(r io.Reader, count int) error {
- _, ql2s, err := dds.resole(r, count, false)
- if err != nil {
- log.Println(err)
- return err
- }
- for _, q := range ql2s {
- ins, _ := dds.addIns(q.Ext)
- if ins == nil {
- continue
- }
- mk := QuoteL2ToMD(q, ins)
- dds.Save(mk)
- }
- return nil
- }
|