okcoin_websocket_client.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package tick
  2. import "golang.org/x/net/websocket"
  3. import "fmt"
  4. import "log"
  5. import "time"
  6. import "tickserver/markinfo"
  7. type OKClient struct {
  8. origin string
  9. url string
  10. ws *websocket.Conn
  11. }
  12. func NewOKClient(orgin string, url string) *OKClient {
  13. okclient := &OKClient{}
  14. okclient.origin = orgin
  15. okclient.url = url
  16. return okclient
  17. }
  18. func (this *OKClient) Connect() (*websocket.Conn, error) {
  19. var ws *websocket.Conn
  20. var err error
  21. for i := 0; i < 10; i++ {
  22. ws, err = websocket.Dial(this.url, "", this.origin)
  23. if err != nil {
  24. fmt.Println(err)
  25. time.Sleep(time.Second * 2)
  26. } else {
  27. break
  28. }
  29. }
  30. if err != nil {
  31. log.Println("Connect", err)
  32. return ws, err
  33. }
  34. fmt.Println("ok")
  35. this.ws = ws
  36. return ws, err
  37. }
  38. func (this *OKClient) Send(strMessage string) error {
  39. if this.ws != nil {
  40. //websocket.Message.Receive(this.ws, v)
  41. if _, err := this.ws.Write([]byte(strMessage)); err != nil {
  42. log.Println("Send", strMessage, err)
  43. return err
  44. }
  45. }
  46. return nil
  47. }
  48. /*
  49. func (this *OKClient) JsonSend(strMessage string) error {
  50. if this.ws != nil {
  51. err := websocket.JSON.Send(this.ws, &msg)
  52. if _, err := this.ws.Write([]byte(strMessage)); err != nil {
  53. log.Fatal(err)
  54. return err
  55. }
  56. }
  57. return nil
  58. }
  59. */
  60. func (this *OKClient) Close() error {
  61. if this.ws != nil {
  62. this.ws.Close()
  63. this.ws = nil
  64. }
  65. return nil
  66. }
  67. func (this *OKClient) Rev(bMessage []byte) (int, error) {
  68. if this.ws != nil {
  69. n := -1
  70. var err error
  71. if n, err = this.ws.Read(bMessage); err != nil {
  72. log.Fatal("Rev", err)
  73. return -1, err
  74. }
  75. return n, nil
  76. }
  77. return -1, nil
  78. }
  79. func (this *OKClient) RevString(strMessage *string) error {
  80. if this.ws != nil {
  81. var err error
  82. if err = websocket.Message.Receive(this.ws, strMessage); err != nil {
  83. return err
  84. }
  85. return nil
  86. }
  87. return nil
  88. }
  89. const (
  90. BTC_REAL = iota
  91. BTC_HIS
  92. BTC_F_REAL
  93. BTC_F_HIS
  94. )
  95. type OKClientWrapper struct {
  96. okclient *OKClient
  97. datatyp int
  98. datastr string
  99. lastsec int64
  100. symbol int
  101. }
  102. func NewOKClientWrapper(orgin string, url string, datatyp, symbol int) (*OKClientWrapper, error) {
  103. okclient := NewOKClient(orgin, url)
  104. okclient.Connect()
  105. okwrapper := &OKClientWrapper{}
  106. okwrapper.okclient = okclient
  107. okwrapper.datatyp = datatyp
  108. okwrapper.symbol = symbol
  109. go okwrapper.DoHeartbeat()
  110. return okwrapper, nil
  111. }
  112. func (this *OKClientWrapper) Emit(channel string) error {
  113. cmd := "{'event':'addChannel','channel':'"
  114. cmd += channel
  115. cmd += "'}"
  116. return this.okclient.Send(cmd)
  117. }
  118. func (this *OKClientWrapper) Emitex(channel string, parameter string) error {
  119. cmd := "{'event':'addChannel','channel':'"
  120. cmd += channel
  121. cmd += "','parameters':{" //{
  122. cmd += parameter
  123. cmd += "}}" //}
  124. // cmd := "{'event':'addChannel','channel':'ok_spotcny_trade','parameters':{'api_key':'3a49ab2a-01e0-4f4d-93a5-f3399edcf84a','sign':'B5DA5241F526E1159D13C8EBE6893FEB','symbol':'btc_cny','type':'sell','price':'2180','amount':'0.001'}}"
  125. fmt.Println("sdssss", cmd)
  126. return this.okclient.Send(cmd)
  127. }
  128. func (this *OKClientWrapper) Remove(channel string) error {
  129. cmd := "{'event':'removeChannel','channel':'"
  130. cmd += channel
  131. cmd += "'}"
  132. return this.okclient.Send(cmd)
  133. }
  134. func (this *OKClientWrapper) Ping() error {
  135. cmd := "{'event':'ping'}"
  136. return this.okclient.Send(cmd)
  137. }
  138. func (this *OKClientWrapper) DoHeartbeat() {
  139. for {
  140. this.Ping()
  141. time.Sleep(time.Second * 5)
  142. if time.Now().Unix()-this.lastsec > 15 {
  143. log.Println(this.symbol, "time out!")
  144. this.okclient.Close()
  145. time.Sleep(time.Second * 2)
  146. this.okclient.Connect()
  147. time.Sleep(time.Second * 3)
  148. this.ReEmit()
  149. }
  150. }
  151. }
  152. func (this *OKClientWrapper) ReEmit() {
  153. switch this.datatyp {
  154. case BTC_REAL:
  155. if this.symbol == markinfo.BTCCNY {
  156. this.Emit("ok_btccny_ticker")
  157. } else {
  158. this.Emit("ok_btcusd_ticker")
  159. }
  160. case BTC_HIS:
  161. if this.symbol == markinfo.BTCCNY {
  162. this.Emit("ok_btccny_kline_1min")
  163. } else {
  164. this.Emit("ok_btcusd_kline_1min")
  165. }
  166. case BTC_F_REAL:
  167. this.Emit("ok_btcusd_future_ticker_this_week")
  168. case BTC_F_HIS:
  169. this.Emit("ok_btcusd_kline_this_week_1min")
  170. }
  171. }