ds_saxo.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. // +build linux
  2. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  3. package tick
  4. // 本文件实现盛宝saxo数据源接口, 实时数据和历史数据的获取和保存
  5. import (
  6. "encoding/csv"
  7. "errors"
  8. "log"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "tickserver/api/saxocgo"
  14. "tickserver/markinfo"
  15. "tickserver/server/market"
  16. )
  17. type InsInfo struct {
  18. Id string
  19. Name string
  20. ExId string
  21. PriceInc float64
  22. Margin float64
  23. StartTime string
  24. }
  25. // SaxoFixDS实现了dataSource接口, 并对fix的历史数据和实时数据保存
  26. type SaxoFixDS struct {
  27. *DSBase
  28. conf *DsConf
  29. symbolMap map[string]string
  30. //insMap map[int64]*market.Instrument
  31. }
  32. func init() {
  33. drivers[Saxo] = newSaxoDS
  34. }
  35. func newSaxoDS(conf *DsConf) (DataSource, error) {
  36. insMap, symbolMap, err := parseCSV(conf.SymbolsFile)
  37. if err != nil {
  38. return nil, err
  39. }
  40. sds := &SaxoFixDS{
  41. DSBase: NewDsBase(conf),
  42. conf: conf,
  43. symbolMap: symbolMap,
  44. //insMap: insMap,
  45. }
  46. sds.insMap = insMap
  47. return sds, nil
  48. }
  49. func parseCSV(name string) (map[int64]*Instrument, map[string]string, error) {
  50. f, err := os.Open(name)
  51. if err != nil {
  52. return nil, nil, err
  53. }
  54. r := csv.NewReader(f)
  55. insMap := make(map[int64]*Instrument)
  56. symbolMap := make(map[string]string)
  57. first := true
  58. for {
  59. ss, err := r.Read()
  60. if err != nil {
  61. break
  62. }
  63. if len(ss) != 1 {
  64. continue
  65. }
  66. if first {
  67. first = false
  68. continue
  69. }
  70. s := strings.Trim(ss[0], " ")
  71. symbol := strings.Replace(s, "/", "", 1) // EUR/USD ==> EURUSD
  72. id, err := markinfo.SymbolId(symbol)
  73. if err != nil {
  74. log.Println(err)
  75. continue
  76. }
  77. unit, err := markinfo.SymbolUint(symbol)
  78. if err != nil {
  79. log.Println(err)
  80. continue
  81. }
  82. symbolMap[strconv.Itoa(id)] = s
  83. ins := &Instrument{
  84. Id: int64(id), //market.SaxoPrefix + symbol,
  85. Name: s,
  86. ExId: market.Saxo,
  87. PriceInc: unit,
  88. Type: market.Forex,
  89. StartTime: time.Date(2014, 12, 31, 0, 0, 0, 0, time.Local).Unix() * 1000,
  90. }
  91. id64 := int64(id)
  92. insMap[id64] = ins
  93. }
  94. return insMap, symbolMap, nil
  95. }
  96. //func (fds *SaxoFixDS) SubIns() *event.Event {
  97. //return fds.insPublisher.Event()
  98. //}
  99. func (fds *SaxoFixDS) Name() string {
  100. return Saxo
  101. }
  102. func (fds *SaxoFixDS) Run() {
  103. log.Println("SaxoFixDS.Run")
  104. //for _, ins := range fds.insMap {
  105. //log.Println("SaxoFixDS:", ins.Id, ins.Name)
  106. //fds.insPublisher.Publish(ins)
  107. //}
  108. //go fds.RunSave(16)
  109. fixApp := saxocgo.NewApp(fds.symbolMap, fds.conf.User, fds.conf.PassWord)
  110. cfgFile := fds.conf.CfgFile
  111. go fixApp.Run(cfgFile)
  112. for fixTick := range fixApp.Ch {
  113. m, err := fds.convMarket(fixTick)
  114. if err != nil {
  115. continue
  116. }
  117. ins, ok := fds.insMap[m.InsId]
  118. if !ok {
  119. log.Fatal("InsId is NOT in fds.insMap", m.InsId)
  120. }
  121. if m.Timestamp < ins.StartTime {
  122. log.Println("error: m.Timestamp < ins.StartTime:", m.Timestamp, ins.StartTime)
  123. continue
  124. }
  125. //ins.SetMk(m)
  126. fds.Save(m)
  127. }
  128. }
  129. func (fds *SaxoFixDS) convMarket(tick *saxocgo.FixTick) (*Market, error) {
  130. if tick.AskCount == 0 && tick.BidCount == 0 {
  131. return nil, errors.New("tick.AskCount==0 && tick.BidCount==0")
  132. }
  133. id := saxocgo.Symbol(tick.Symbol)
  134. symbol, ok := fds.symbolMap[id]
  135. if !ok {
  136. return nil, errors.New("tick.Symbol NOT in symbolMap: " + id)
  137. }
  138. symbol = strings.Replace(symbol, "/", "", 1) // EUR/USD ==> EURUSD
  139. //insId := market.SaxoPrefix + symbol
  140. insId, _ := markinfo.SymbolId(symbol)
  141. insId64 := int64(insId)
  142. _, ok = fds.insMap[insId64]
  143. if !ok {
  144. log.Fatal("InsId is NOT in fds.insMap", insId)
  145. }
  146. //mk := ins.GetMk()
  147. mk := &Market{}
  148. mk.InsId = insId64
  149. mk.Type = IntSaxo
  150. for len(mk.Asks) < int(tick.AskCount) {
  151. mk.Asks = append(mk.Asks, PP{})
  152. }
  153. for len(mk.Bids) < int(tick.BidCount) {
  154. mk.Bids = append(mk.Bids, PP{})
  155. }
  156. for i := 0; i < int(tick.AskCount); i++ {
  157. if tick.AskPrice[i] == 0 {
  158. mk.Asks[i][0] = mk.Asks[i][0]
  159. } else {
  160. mk.Asks[i][0] = tick.AskPrice[i]
  161. }
  162. mk.Asks[i][1] = float64(tick.AskVolume[i])
  163. }
  164. for i := 0; i < int(tick.BidCount); i++ {
  165. if tick.BidPrice[i] == 0 {
  166. mk.Bids[i][0] = mk.Bids[i][0]
  167. } else {
  168. mk.Bids[i][0] = tick.BidPrice[i]
  169. }
  170. mk.Bids[i][1] = float64(tick.BidVolume[i])
  171. }
  172. lastPrice := 0.
  173. vol := 0.
  174. if len(mk.Bids) > 0 {
  175. lastPrice = mk.Bids[0][0]
  176. vol = mk.Bids[0][1]
  177. }
  178. if lastPrice == 0. {
  179. if len(mk.Asks) > 0 {
  180. lastPrice = mk.Asks[0][0]
  181. vol = mk.Asks[0][1]
  182. }
  183. }
  184. mk.LastPrice = lastPrice
  185. mk.LastVolume = vol
  186. mk.Timestamp = int64(tick.Time)*1000 + int64(tick.Millisecond)
  187. return mk, nil
  188. }