ds_polo.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package tick
  3. /*
  4. #include <string.h>
  5. */
  6. import "C"
  7. // 本文件实现polo数据源接口, 实时数据和历史数据的获取和保存
  8. import (
  9. "errors"
  10. "io/ioutil"
  11. "log"
  12. "net/http"
  13. "strconv"
  14. "strings"
  15. "time"
  16. "tickserver/markinfo"
  17. "tickserver/server/market"
  18. "github.com/jcelliott/turnpike"
  19. )
  20. var poloInss = []int{
  21. markinfo.BTCCNY,
  22. markinfo.ETHCNY,
  23. markinfo.ETCCNY,
  24. }
  25. // PoloDS实现了dataSource接口, 并对polo的历史数据和实时数据保存
  26. type PoloDS struct {
  27. *DSBase
  28. conf *DsConf
  29. client *turnpike.Client
  30. lastTime time.Time
  31. usdcny float64
  32. }
  33. func init() {
  34. drivers[Polo] = newPoloDS
  35. }
  36. func newPoloDS(conf *DsConf) (DataSource, error) {
  37. pds := &PoloDS{
  38. DSBase: NewDsBase(conf),
  39. conf: conf,
  40. }
  41. pds.insMap = poloInsMap()
  42. var err error
  43. pds.usdcny, err = getUsdCny()
  44. if err != nil || pds.usdcny < 1 || pds.usdcny > 100 {
  45. return nil, err
  46. } else {
  47. log.Println("usdcny:", pds.usdcny)
  48. }
  49. return pds, nil
  50. }
  51. func (pds *PoloDS) Name() string {
  52. return Polo
  53. }
  54. func (pds *PoloDS) Run() {
  55. log.Println("PoloDS.Run")
  56. go func() {
  57. var err error
  58. for {
  59. time.Sleep(time.Second * 10)
  60. pds.usdcny, err = getUsdCny()
  61. if err != nil {
  62. log.Println(err)
  63. } else {
  64. log.Println("usdcny:", pds.usdcny)
  65. }
  66. }
  67. }()
  68. err := pds.connect()
  69. if err != nil {
  70. log.Println("polods run:", err)
  71. } else {
  72. pds.lastTime = time.Now()
  73. }
  74. go pds.checkConnection()
  75. }
  76. func (pds *PoloDS) checkConnection() {
  77. for {
  78. time.Sleep(1 * time.Minute)
  79. if time.Now().Sub(pds.lastTime) >= 2*time.Minute {
  80. err := pds.connect()
  81. if err != nil {
  82. log.Println(err)
  83. } else {
  84. pds.lastTime = time.Now()
  85. }
  86. }
  87. }
  88. }
  89. func (pds *PoloDS) process(args []interface{}, kwargs map[string]interface{}) {
  90. pds.lastTime = time.Now()
  91. var hrLowStr, hrHighStr, lastStr, currencyPairStr, lowestAskStr, highestBidStr, baseVolumeStr, quoteVolumeStr string
  92. for i, v := range args {
  93. s, ok := v.([]byte)
  94. if ok {
  95. switch i {
  96. case 0:
  97. currencyPairStr = string(s)
  98. case 1:
  99. lastStr = string(s)
  100. case 2:
  101. lowestAskStr = string(s)
  102. case 3:
  103. highestBidStr = string(s)
  104. //case 4:
  105. //percentChangeStr = string(s)
  106. case 5:
  107. baseVolumeStr = string(s)
  108. case 6:
  109. quoteVolumeStr = string(s)
  110. //case 7:
  111. //isFrozenStr = string(s)
  112. case 8:
  113. hrHighStr = string(s)
  114. case 9:
  115. hrLowStr = string(s)
  116. }
  117. if (currencyPairStr == "USDT_BTC" || currencyPairStr == "USDT_ETH" || currencyPairStr == "USDT_ETC") && len(lowestAskStr) > 0 && len(highestBidStr) > 0 {
  118. mk := &Market{}
  119. mk.Type = IntPolo
  120. if currencyPairStr == "USDT_BTC" {
  121. mk.InsId = markinfo.BTCCNY
  122. }
  123. if currencyPairStr == "USDT_ETH" {
  124. mk.InsId = markinfo.ETHCNY
  125. }
  126. if currencyPairStr == "USDT_ETC" {
  127. mk.InsId = markinfo.ETCCNY
  128. }
  129. now := time.Now()
  130. mk.Timestamp = now.Unix()*int64(1000) + int64(now.Nanosecond()/1000000)
  131. ask64, _ := strconv.ParseFloat(lowestAskStr, 64)
  132. ask64 *= pds.usdcny
  133. bid64, _ := strconv.ParseFloat(highestBidStr, 64)
  134. bid64 *= pds.usdcny
  135. basev, _ := strconv.ParseFloat(baseVolumeStr, 64)
  136. quotev, _ := strconv.ParseFloat(quoteVolumeStr, 64)
  137. last, _ := strconv.ParseFloat(lastStr, 64)
  138. last *= pds.usdcny
  139. high, _ := strconv.ParseFloat(hrHighStr, 64)
  140. high *= pds.usdcny
  141. low, _ := strconv.ParseFloat(hrLowStr, 64)
  142. low *= pds.usdcny
  143. var ask, bid PP
  144. ask[0] = ask64
  145. ask[1] = basev
  146. bid[0] = bid64
  147. bid[1] = basev
  148. mk.Asks = append(mk.Asks, ask)
  149. mk.Bids = append(mk.Bids, bid)
  150. mk.High = high
  151. mk.LastPrice = last
  152. mk.Low = low
  153. mk.AllVolume = quotev
  154. pds.Save(mk)
  155. }
  156. }
  157. }
  158. }
  159. func (pds *PoloDS) connect() (err error) {
  160. if pds.client != nil {
  161. pds.client.Close()
  162. }
  163. for i := 0; i < 10; i++ {
  164. //pds.client, err = turnpike.NewWebsocketClient(turnpike.MSGPACK, "wss://api.poloniex.com", nil)
  165. pds.client, err = turnpike.NewWebsocketClient(turnpike.MSGPACK, "wss://api.poloniex.com")
  166. if err == nil {
  167. break
  168. }
  169. time.Sleep(1 * time.Minute)
  170. }
  171. if err != nil {
  172. log.Println("NewWebsocketClient", err)
  173. return err
  174. }
  175. _, err = pds.client.JoinRealm("realm1", nil)
  176. if err != nil {
  177. log.Println("JoinRealm", err)
  178. return err
  179. }
  180. err = pds.client.Subscribe("ticker", pds.process)
  181. if err != nil {
  182. log.Println("Subscribe", err)
  183. return err
  184. }
  185. return nil
  186. }
  187. func poloInsMap() map[int64]*Instrument {
  188. insMap := make(map[int64]*Instrument)
  189. for _, id := range poloInss {
  190. x, _ := markinfo.SymbolName(id)
  191. u, _ := markinfo.SymbolUint(x)
  192. ins := &Instrument{
  193. Id: int64(id),
  194. Name: x,
  195. ExId: market.Polo,
  196. Type: market.Btcs,
  197. PriceInc: u,
  198. StartTime: time.Now().Unix() * 1000,
  199. }
  200. insMap[int64(id)] = ins
  201. }
  202. return insMap
  203. }
  204. func getUsdCny() (float64, error) {
  205. var price float64
  206. url := "http://hq.sinajs.cn/rn=1488788247745list=fx_susdcny"
  207. response, err := http.Get(url)
  208. if err != nil {
  209. return price, err
  210. }
  211. defer response.Body.Close()
  212. body, err := ioutil.ReadAll(response.Body)
  213. if err != nil {
  214. return price, err
  215. }
  216. strs := strings.Split(string(body), ",")
  217. if len(strs) > 1 {
  218. price, err = strconv.ParseFloat(strs[1], 32)
  219. if err != nil {
  220. return price, err
  221. }
  222. } else {
  223. return price, errors.New("invalid data")
  224. }
  225. return price, nil
  226. }