123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package tick
- /*
- #include <string.h>
- */
- import "C"
- // 本文件实现polo数据源接口, 实时数据和历史数据的获取和保存
- import (
- "errors"
- "io/ioutil"
- "log"
- "net/http"
- "strconv"
- "strings"
- "time"
- "tickserver/markinfo"
- "tickserver/server/market"
- "github.com/jcelliott/turnpike"
- )
- var poloInss = []int{
- markinfo.BTCCNY,
- markinfo.ETHCNY,
- markinfo.ETCCNY,
- }
- // PoloDS实现了dataSource接口, 并对polo的历史数据和实时数据保存
- type PoloDS struct {
- *DSBase
- conf *DsConf
- client *turnpike.Client
- lastTime time.Time
- usdcny float64
- }
- func init() {
- drivers[Polo] = newPoloDS
- }
- func newPoloDS(conf *DsConf) (DataSource, error) {
- pds := &PoloDS{
- DSBase: NewDsBase(conf),
- conf: conf,
- }
- pds.insMap = poloInsMap()
- var err error
- pds.usdcny, err = getUsdCny()
- if err != nil || pds.usdcny < 1 || pds.usdcny > 100 {
- return nil, err
- } else {
- log.Println("usdcny:", pds.usdcny)
- }
- return pds, nil
- }
- func (pds *PoloDS) Name() string {
- return Polo
- }
- func (pds *PoloDS) Run() {
- log.Println("PoloDS.Run")
- go func() {
- var err error
- for {
- time.Sleep(time.Second * 10)
- pds.usdcny, err = getUsdCny()
- if err != nil {
- log.Println(err)
- } else {
- log.Println("usdcny:", pds.usdcny)
- }
- }
- }()
- err := pds.connect()
- if err != nil {
- log.Println("polods run:", err)
- } else {
- pds.lastTime = time.Now()
- }
- go pds.checkConnection()
- }
- func (pds *PoloDS) checkConnection() {
- for {
- time.Sleep(1 * time.Minute)
- if time.Now().Sub(pds.lastTime) >= 2*time.Minute {
- err := pds.connect()
- if err != nil {
- log.Println(err)
- } else {
- pds.lastTime = time.Now()
- }
- }
- }
- }
- func (pds *PoloDS) process(args []interface{}, kwargs map[string]interface{}) {
- pds.lastTime = time.Now()
- var hrLowStr, hrHighStr, lastStr, currencyPairStr, lowestAskStr, highestBidStr, baseVolumeStr, quoteVolumeStr string
- for i, v := range args {
- s, ok := v.([]byte)
- if ok {
- switch i {
- case 0:
- currencyPairStr = string(s)
- case 1:
- lastStr = string(s)
- case 2:
- lowestAskStr = string(s)
- case 3:
- highestBidStr = string(s)
- //case 4:
- //percentChangeStr = string(s)
- case 5:
- baseVolumeStr = string(s)
- case 6:
- quoteVolumeStr = string(s)
- //case 7:
- //isFrozenStr = string(s)
- case 8:
- hrHighStr = string(s)
- case 9:
- hrLowStr = string(s)
- }
- if (currencyPairStr == "USDT_BTC" || currencyPairStr == "USDT_ETH" || currencyPairStr == "USDT_ETC") && len(lowestAskStr) > 0 && len(highestBidStr) > 0 {
- mk := &Market{}
- mk.Type = IntPolo
- if currencyPairStr == "USDT_BTC" {
- mk.InsId = markinfo.BTCCNY
- }
- if currencyPairStr == "USDT_ETH" {
- mk.InsId = markinfo.ETHCNY
- }
- if currencyPairStr == "USDT_ETC" {
- mk.InsId = markinfo.ETCCNY
- }
- now := time.Now()
- mk.Timestamp = now.Unix()*int64(1000) + int64(now.Nanosecond()/1000000)
- ask64, _ := strconv.ParseFloat(lowestAskStr, 64)
- ask64 *= pds.usdcny
- bid64, _ := strconv.ParseFloat(highestBidStr, 64)
- bid64 *= pds.usdcny
- basev, _ := strconv.ParseFloat(baseVolumeStr, 64)
- quotev, _ := strconv.ParseFloat(quoteVolumeStr, 64)
- last, _ := strconv.ParseFloat(lastStr, 64)
- last *= pds.usdcny
- high, _ := strconv.ParseFloat(hrHighStr, 64)
- high *= pds.usdcny
- low, _ := strconv.ParseFloat(hrLowStr, 64)
- low *= pds.usdcny
- var ask, bid PP
- ask[0] = ask64
- ask[1] = basev
- bid[0] = bid64
- bid[1] = basev
- mk.Asks = append(mk.Asks, ask)
- mk.Bids = append(mk.Bids, bid)
- mk.High = high
- mk.LastPrice = last
- mk.Low = low
- mk.AllVolume = quotev
- pds.Save(mk)
- }
- }
- }
- }
- func (pds *PoloDS) connect() (err error) {
- if pds.client != nil {
- pds.client.Close()
- }
- for i := 0; i < 10; i++ {
- //pds.client, err = turnpike.NewWebsocketClient(turnpike.MSGPACK, "wss://api.poloniex.com", nil)
- pds.client, err = turnpike.NewWebsocketClient(turnpike.MSGPACK, "wss://api.poloniex.com")
- if err == nil {
- break
- }
- time.Sleep(1 * time.Minute)
- }
- if err != nil {
- log.Println("NewWebsocketClient", err)
- return err
- }
- _, err = pds.client.JoinRealm("realm1", nil)
- if err != nil {
- log.Println("JoinRealm", err)
- return err
- }
- err = pds.client.Subscribe("ticker", pds.process)
- if err != nil {
- log.Println("Subscribe", err)
- return err
- }
- return nil
- }
- func poloInsMap() map[int64]*Instrument {
- insMap := make(map[int64]*Instrument)
- for _, id := range poloInss {
- x, _ := markinfo.SymbolName(id)
- u, _ := markinfo.SymbolUint(x)
- ins := &Instrument{
- Id: int64(id),
- Name: x,
- ExId: market.Polo,
- Type: market.Btcs,
- PriceInc: u,
- StartTime: time.Now().Unix() * 1000,
- }
- insMap[int64(id)] = ins
- }
- return insMap
- }
- func getUsdCny() (float64, error) {
- var price float64
- url := "http://hq.sinajs.cn/rn=1488788247745list=fx_susdcny"
- response, err := http.Get(url)
- if err != nil {
- return price, err
- }
- defer response.Body.Close()
- body, err := ioutil.ReadAll(response.Body)
- if err != nil {
- return price, err
- }
- strs := strings.Split(string(body), ",")
- if len(strs) > 1 {
- price, err = strconv.ParseFloat(strs[1], 32)
- if err != nil {
- return price, err
- }
- } else {
- return price, errors.New("invalid data")
- }
- return price, nil
- }
|