ds_dzh.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package tick
  3. // 本文件实现大智慧数据源接口, 实时数据和历史数据的获取和保存
  4. import (
  5. "encoding/binary"
  6. "errors"
  7. "io"
  8. "log"
  9. "net"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "tickserver/server/market"
  14. "golang.org/x/text/encoding/simplifiedchinese"
  15. )
  16. // DzhDS实现了dataSource接口, 并对dzh的历史数据和实时数据保存
  17. type DzhDS struct {
  18. *DSBase
  19. conf *DsConf
  20. //insMap map[string]*market.Instrument
  21. tcp net.Conn
  22. }
  23. func init() {
  24. drivers[Dzh] = newDzhDS
  25. }
  26. func newDzhDS(conf *DsConf) (DataSource, error) {
  27. tcp, err := net.DialTimeout("tcp", conf.Url, 2*time.Second)
  28. if err != nil {
  29. return nil, err
  30. }
  31. return &DzhDS{
  32. DSBase: NewDsBase(conf),
  33. conf: conf,
  34. tcp: tcp,
  35. //insMap: make(map[string]*market.Instrument),
  36. }, nil
  37. }
  38. func (dds *DzhDS) Name() string {
  39. return Dzh
  40. }
  41. //func (dds *DzhDS) SubIns() *event.Event {
  42. //return dds.insPublisher.Event()
  43. //}
  44. func (dds *DzhDS) Run() {
  45. log.Println("DzhDS.Run")
  46. //go dds.RunSave(64)
  47. for {
  48. err := dds.read(dds.tcp)
  49. if err != nil {
  50. log.Println(err)
  51. dds.tcp, err = net.Dial("tcp", dds.conf.Url)
  52. if err != nil {
  53. log.Fatal(err)
  54. }
  55. continue
  56. }
  57. }
  58. }
  59. func (dds *DzhDS) read(r io.Reader) error {
  60. var t int32
  61. err := binary.Read(r, binary.LittleEndian, &t)
  62. if err != nil {
  63. return errors.New("read data TYPE error:" + err.Error())
  64. }
  65. var count, size uint32
  66. err = binary.Read(r, binary.LittleEndian, &count)
  67. if err != nil {
  68. return errors.New("read data COUNT error:" + err.Error())
  69. }
  70. err = binary.Read(r, binary.LittleEndian, &size)
  71. if err != nil {
  72. return errors.New("read data SIZE error:" + err.Error())
  73. }
  74. switch t {
  75. case 0:
  76. return dds.resoleL1(r, int(count))
  77. case 1:
  78. return dds.resoleL2(r, int(count))
  79. case 2:
  80. for i := 0; i < int(count); i++ {
  81. mbi := MarketBoardInfo{}
  82. err = binary.Read(r, binary.LittleEndian, &mbi)
  83. if err != nil {
  84. log.Println("read MarketBoardInfo error:", err)
  85. }
  86. }
  87. default:
  88. log.Fatal("can't go here")
  89. }
  90. return nil
  91. }
  92. type Ext struct {
  93. Code [16]byte
  94. Name [16]byte
  95. PrevClose uint32
  96. TopLtd uint32
  97. BotLtd uint32
  98. }
  99. // 五档行情数据结构
  100. type QuoteL1 struct {
  101. Ext
  102. Number int16
  103. Tim32 int32
  104. Open int32
  105. High int32
  106. Low int32
  107. New int32
  108. AllVol int32
  109. AllAmount int32
  110. MarketVal int32
  111. Reserver2 int32
  112. PricesOfBid [5]int32
  113. VolumeOfBid [5]int32
  114. PricesOfAsk [5]int32
  115. VolumeOfAsk [5]int32
  116. Zero [2]int32
  117. }
  118. func QuoteL1ToMD(ql1 *QuoteL1, ins *Instrument, insIdStr string) *Market {
  119. mk := &Market{}
  120. mk.InsId = ins.Id
  121. mk.Type = IntDzh
  122. factor := 100.0
  123. sid := market.RealInsId(insIdStr)
  124. if strings.HasPrefix(sid, "SH510") || strings.HasPrefix(sid, "SZ160") {
  125. factor = 1000.0
  126. }
  127. mk.Asks = make([]PP, 10)
  128. mk.Bids = make([]PP, 10)
  129. for i := 0; i < 5; i++ {
  130. mk.Asks[i][0] = float64(ql1.PricesOfAsk[i]) / factor
  131. mk.Asks[i][1] = float64(ql1.VolumeOfAsk[i]) * 100.
  132. mk.Bids[i][0] = float64(ql1.PricesOfBid[i]) / factor
  133. mk.Bids[i][1] = float64(ql1.VolumeOfBid[i]) * 100.
  134. }
  135. mk.Close = float64(ql1.PrevClose) / factor
  136. mk.Open = float64(ql1.Open) / factor
  137. mk.LastPrice = float64(ql1.New) / factor
  138. mk.Timestamp = int64(ql1.Tim32) * 1000
  139. oldVol := mk.AllVolume
  140. mk.AllVolume = float64(ql1.AllVol) * 100.
  141. mk.LastVolume = mk.AllVolume - oldVol
  142. if mk.LastVolume < 0 {
  143. mk.LastVolume = mk.AllVolume
  144. }
  145. mk.AllAmount = float64(ql1.AllAmount)
  146. return mk
  147. }
  148. // 十档行情数据结构
  149. type QuoteL2 struct {
  150. Ext
  151. Market [2]byte
  152. Number int16
  153. BuyEven float32
  154. BuyVol float32
  155. SellEven float32
  156. SellVol float32
  157. PricesOfBid [5]float32
  158. VolumeOfBid [5]float32
  159. PricesOfAsk [5]float32
  160. VolumeOfAsk [5]float32
  161. }
  162. func QuoteL2ToMD(ql2 *QuoteL2, ins *Instrument) *Market {
  163. //mk := ins.GetMk()
  164. mk := &Market{}
  165. mk.Type = IntDzh
  166. mk.InsId = ins.Id
  167. if len(mk.Asks) == 0 {
  168. mk.Asks = make([]PP, 10)
  169. }
  170. if len(mk.Bids) == 0 {
  171. mk.Bids = make([]PP, 10)
  172. }
  173. for i := 0; i < 5; i++ {
  174. mk.Asks[i+5][0] = float64(ql2.PricesOfAsk[i])
  175. mk.Asks[i+5][1] = float64(ql2.VolumeOfAsk[i]) * 100
  176. mk.Bids[i+5][0] = float64(ql2.PricesOfBid[i])
  177. mk.Bids[i+5][1] = float64(ql2.VolumeOfBid[i]) * 100
  178. }
  179. //ins.SetMk(mk)
  180. // log.Printf("@@@@@@@:%+v\n", mk)
  181. return mk
  182. }
  183. type MarketBoardInfo struct {
  184. Code [16]byte
  185. Info [48]byte
  186. }
  187. func (dds *DzhDS) resole(r io.Reader, count int, isL1 bool) ([]*QuoteL1, []*QuoteL2, error) {
  188. ql1s := make([]*QuoteL1, count)
  189. ql2s := make([]*QuoteL2, count)
  190. for i := 0; i < int(count); i++ {
  191. if isL1 {
  192. ql1 := &QuoteL1{}
  193. err := binary.Read(r, binary.LittleEndian, ql1)
  194. if err != nil {
  195. return nil, nil, errors.New("read QuoteL1 error:" + err.Error())
  196. }
  197. ql1s[i] = ql1
  198. } else {
  199. ql2 := &QuoteL2{}
  200. err := binary.Read(r, binary.LittleEndian, ql2)
  201. if err != nil {
  202. return nil, nil, errors.New("read QuoteL2 error:" + err.Error())
  203. }
  204. ql2s[i] = ql2
  205. }
  206. }
  207. return ql1s, ql2s, nil
  208. }
  209. func (dds *DzhDS) addIns(ext Ext) (*Instrument, string) {
  210. sid := b2s(ext.Code[:])
  211. exid := market.SHEX
  212. if strings.Contains(sid, market.SZEX) {
  213. exid = market.SZEX
  214. }
  215. sname := b2s(ext.Name[:])
  216. trans := simplifiedchinese.GBK.NewDecoder()
  217. dst := make([]byte, 1024)
  218. insIdStr := market.DzhPrefix + sid
  219. if len(sid) != 8 {
  220. log.Println("wrong sid", sid)
  221. return nil, ""
  222. }
  223. insId, _ := strconv.ParseInt(sid[2:], 10, 64)
  224. ins, ok := dds.insMap[insId]
  225. if !ok {
  226. nDst, _, err := trans.Transform(dst, []byte(sname), true)
  227. if err == nil {
  228. sname = string(dst[0:nDst])
  229. }
  230. ins = &Instrument{
  231. Id: insId,
  232. Name: sname,
  233. Type: market.Securities,
  234. ExId: exid,
  235. PriceInc: 0.01,
  236. }
  237. dds.insMap[insId] = ins
  238. //log.Println(ins)
  239. //dds.insPublisher.Publish(ins)
  240. }
  241. return ins, insIdStr
  242. }
  243. func (dds *DzhDS) resoleL1(r io.Reader, count int) error {
  244. ql1s, _, err := dds.resole(r, count, true)
  245. if err != nil {
  246. log.Println(err)
  247. return err
  248. }
  249. for _, q := range ql1s {
  250. ins, insIdStr := dds.addIns(q.Ext)
  251. if ins == nil {
  252. //log.Fatal("@@@: DzhDS.resoleL1 ins == nil")
  253. continue
  254. }
  255. mk := QuoteL1ToMD(q, ins, insIdStr)
  256. //ins.SetMk(mk)
  257. if mk.LastVolume == 0 {
  258. continue // 交易量为0, 不存储
  259. }
  260. dds.Save(mk)
  261. }
  262. return nil
  263. }
  264. func b2s(b []byte) string {
  265. for i, c := range b {
  266. if c == 0 {
  267. return string(b[:i])
  268. }
  269. }
  270. return string(b[:])
  271. }
  272. func (dds *DzhDS) resoleL2(r io.Reader, count int) error {
  273. _, ql2s, err := dds.resole(r, count, false)
  274. if err != nil {
  275. log.Println(err)
  276. return err
  277. }
  278. for _, q := range ql2s {
  279. ins, _ := dds.addIns(q.Ext)
  280. if ins == nil {
  281. continue
  282. }
  283. mk := QuoteL2ToMD(q, ins)
  284. dds.Save(mk)
  285. }
  286. return nil
  287. }