ds_btc.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package tick
  3. /*
  4. #include <string.h>
  5. */
  6. import "C"
  7. // 本文件实现okcoin数据源接口, 实时数据和历史数据的获取和保存
  8. import (
  9. "encoding/json"
  10. "log"
  11. "strconv"
  12. "time"
  13. "tickserver/markinfo"
  14. "tickserver/server/market"
  15. )
  16. const (
  17. ClientCount = 3
  18. )
  19. var btcInss = []int{
  20. markinfo.BTCUSD,
  21. markinfo.BTCCNY,
  22. markinfo.BTCFUSD,
  23. }
  24. type BTCTickerWS struct {
  25. Buy float64 `json:"buy"`
  26. High float64 `json:"high"`
  27. Last string `json:"last"`
  28. Low float64 `json:"low"`
  29. Sell float64 `json:"sell"`
  30. Timestamp string `json:"timestamp"`
  31. Vol string `json:"vol"`
  32. }
  33. type BTCTickWS struct {
  34. Channel string `json:"channel"`
  35. Data BTCTickerWS `json:"data"`
  36. }
  37. type BTCTickerWSFirst struct {
  38. Buy string `json:"buy"`
  39. High string `json:"high"`
  40. Last string `json:"last"`
  41. Low string `json:"low"`
  42. Sell string `json:"sell"`
  43. Timestamp string `json:"timestamp"`
  44. Vol string `json:"vol"`
  45. }
  46. type BTCTickWSFirst struct {
  47. Channel string `json:"channel"`
  48. Data BTCTickerWSFirst `json:"data"`
  49. }
  50. type BTCFTickerWS struct {
  51. Buy float64 `json:"buy"`
  52. Contract_id string `json:"contract_id"`
  53. High float64 `json:"high"`
  54. Hold_amount float64 `json:"hold_amount"`
  55. Last string `json:"last"`
  56. Low float64 `json:"low"`
  57. Sell float64 `json:"sell"`
  58. UnitAmount float64 `json:"unitAmount"`
  59. Vol string `json:"vol"`
  60. }
  61. type BTCFTickWS struct {
  62. Channel string `json:"channel"`
  63. Data BTCFTickerWS `json:"data"`
  64. }
  65. // BtcDS实现了dataSource接口, 并对btc的历史数据和实时数据保存
  66. type BtcDS struct {
  67. *DSBase
  68. conf *DsConf
  69. wsclient [ClientCount]*OKClientWrapper
  70. }
  71. func init() {
  72. drivers[Btc] = newBtcDS
  73. }
  74. func newBtcDS(conf *DsConf) (DataSource, error) {
  75. bds := &BtcDS{
  76. DSBase: NewDsBase(conf),
  77. conf: conf,
  78. }
  79. bds.insMap = btcInsMap()
  80. return bds, nil
  81. }
  82. func (bds *BtcDS) Name() string {
  83. return Btc
  84. }
  85. func (bds *BtcDS) Run() {
  86. log.Println("BtcDS.Run")
  87. var err error
  88. /*bds.wsclient[0], err = NewOKClientWrapper("http://localhost/", "wss://real.okcoin.com:10440/websocket/okcoinapi", BTC_REAL, markinfo.BTCUSD)
  89. if err != nil {
  90. log.Println("NewOKClientWrapper", err)
  91. }
  92. time.Sleep(time.Second * 3)
  93. err = bds.wsclient[0].Emit("ok_btcusd_ticker")
  94. if err != nil {
  95. log.Println("Emit", err)
  96. }
  97. go bds.ProcessData(0)*/
  98. bds.wsclient[0], err = NewOKClientWrapper("http://localhost/", "wss://real.okcoin.cn:10440/websocket/okcoinapi", BTC_REAL, markinfo.BTCCNY)
  99. if err != nil {
  100. log.Println("NewOKClientWrapper", err)
  101. }
  102. time.Sleep(time.Second * 3)
  103. err = bds.wsclient[0].Emit("ok_btccny_ticker")
  104. if err != nil {
  105. log.Println("Emit", err)
  106. }
  107. go bds.ProcessData(0)
  108. /*bds.wsclient[2], err = NewOKClientWrapper("http://localhost/", "wss://real.okcoin.com:10440/websocket/okcoinapi", BTC_F_REAL, markinfo.BTCFUSD)
  109. if err != nil {
  110. log.Println("NewOKClientWrapper", err)
  111. }
  112. time.Sleep(time.Second * 3)
  113. err = bds.wsclient[2].Emit("ok_btcusd_future_ticker_this_week")
  114. if err != nil {
  115. log.Println("Emit", err)
  116. }
  117. go bds.ProcessData(2)*/
  118. }
  119. func (bds *BtcDS) ProcessData(index int) {
  120. for {
  121. err := bds.wsclient[index].okclient.RevString(&bds.wsclient[index].datastr)
  122. if err == nil {
  123. if bds.wsclient[index].datastr == "{\"event\":\"pong\"}" {
  124. bds.wsclient[index].lastsec = time.Now().Unix()
  125. continue
  126. }
  127. switch bds.wsclient[index].datatyp {
  128. case BTC_REAL:
  129. err = bds.parseBTCReal(index)
  130. if err != nil {
  131. log.Println("parseBTCReal", bds.wsclient[index].datastr, err)
  132. }
  133. case BTC_F_REAL:
  134. err = bds.parseBTCFReal(index)
  135. if err != nil {
  136. log.Println("parseBTCFReal", bds.wsclient[index].datastr, err)
  137. }
  138. default:
  139. log.Println("data type not supported:", bds.wsclient[index].datatyp)
  140. }
  141. } else {
  142. log.Println(bds.wsclient[index].symbol, "RevString", err)
  143. //bds.wsclient[index].okclient.Close()
  144. time.Sleep(time.Second * 2)
  145. //bds.wsclient[index].okclient.Connect()
  146. //time.Sleep(time.Second * 3)
  147. //bds.wsclient[index].ReEmit()
  148. }
  149. }
  150. }
  151. func (bds *BtcDS) parseBTCFReal(index int) error {
  152. var btcfticks []BTCFTickWS
  153. err := json.Unmarshal([]byte(bds.wsclient[index].datastr), &btcfticks)
  154. if err != nil {
  155. return err
  156. }
  157. for _, btcftick := range btcfticks {
  158. mk := &Market{}
  159. mk.Type = IntBtc
  160. mk.InsId = markinfo.BTCFUSD
  161. mk.Timestamp = time.Now().Unix() * 1000
  162. var ask, bid PP
  163. ask[0] = btcftick.Data.Sell
  164. ask[1] = btcftick.Data.Hold_amount
  165. bid[0] = btcftick.Data.Buy
  166. bid[1] = btcftick.Data.Hold_amount
  167. mk.Asks = append(mk.Asks, ask)
  168. mk.Bids = append(mk.Bids, bid)
  169. mk.High = btcftick.Data.High
  170. mk.LastPrice, _ = strconv.ParseFloat(btcftick.Data.Last, 32)
  171. mk.Low = btcftick.Data.Low
  172. mk.AllAmount = btcftick.Data.UnitAmount
  173. mk.AllVolume, _ = strconv.ParseFloat(btcftick.Data.Vol, 32)
  174. bds.Save(mk)
  175. }
  176. return nil
  177. }
  178. func (bds *BtcDS) parseBTCReal(index int) error {
  179. var btctickwss []BTCTickWS
  180. err := json.Unmarshal([]byte(bds.wsclient[index].datastr), &btctickwss)
  181. if err != nil {
  182. var btctickwssfirst []BTCTickWSFirst
  183. err = json.Unmarshal([]byte(bds.wsclient[index].datastr), &btctickwssfirst)
  184. if err != nil {
  185. log.Println(bds.wsclient[index].datastr)
  186. return err
  187. }
  188. for _, btctickws := range btctickwssfirst {
  189. mk := &Market{}
  190. mk.Type = IntBtc
  191. mk.InsId = int64(bds.wsclient[index].symbol)
  192. mk.Timestamp, _ = strconv.ParseInt(btctickws.Data.Timestamp, 10, 64)
  193. if mk.Timestamp == 0 {
  194. return nil //丢弃非行情数据
  195. }
  196. var ask, bid PP
  197. ask[0], _ = strconv.ParseFloat(btctickws.Data.Sell, 32)
  198. bid[0], _ = strconv.ParseFloat(btctickws.Data.Buy, 32)
  199. mk.Asks = append(mk.Asks, ask)
  200. mk.Bids = append(mk.Bids, bid)
  201. mk.High, _ = strconv.ParseFloat(btctickws.Data.High, 32)
  202. mk.LastPrice, _ = strconv.ParseFloat(btctickws.Data.Last, 32)
  203. mk.Low, _ = strconv.ParseFloat(btctickws.Data.Low, 32)
  204. mk.LastVolume, _ = strconv.ParseFloat(btctickws.Data.Vol, 32)
  205. if mk.LastPrice != 0 {
  206. bds.Save(mk)
  207. }
  208. }
  209. return nil
  210. }
  211. for _, btctickws := range btctickwss {
  212. mk := &Market{}
  213. mk.Type = IntBtc
  214. mk.InsId = int64(bds.wsclient[index].symbol)
  215. mk.Timestamp, _ = strconv.ParseInt(btctickws.Data.Timestamp, 10, 64)
  216. if mk.Timestamp == 0 {
  217. return nil //丢弃非行情数据
  218. }
  219. var ask, bid PP
  220. ask[0] = btctickws.Data.Sell
  221. bid[0] = btctickws.Data.Buy
  222. mk.Asks = append(mk.Asks, ask)
  223. mk.Bids = append(mk.Bids, bid)
  224. mk.High = btctickws.Data.High
  225. mk.LastPrice, _ = strconv.ParseFloat(btctickws.Data.Last, 32)
  226. mk.Low = btctickws.Data.Low
  227. mk.LastVolume, _ = strconv.ParseFloat(btctickws.Data.Vol, 32)
  228. if mk.LastPrice != 0 {
  229. bds.Save(mk)
  230. }
  231. }
  232. return nil
  233. }
  234. func btcInsMap() map[int64]*Instrument {
  235. insMap := make(map[int64]*Instrument)
  236. for _, id := range btcInss {
  237. x, _ := markinfo.SymbolName(id)
  238. u, _ := markinfo.SymbolUint(x)
  239. ins := &Instrument{
  240. Id: int64(id),
  241. Name: x,
  242. ExId: market.Btc,
  243. Type: market.Btcs,
  244. PriceInc: u,
  245. StartTime: time.Now().Unix() * 1000,
  246. }
  247. insMap[int64(id)] = ins
  248. }
  249. return insMap
  250. }