// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package tick /* #include */ import "C" // 本文件实现polo数据源接口, 实时数据和历史数据的获取和保存 import ( "errors" "io/ioutil" "log" "net/http" "strconv" "strings" "time" "tickserver/markinfo" "tickserver/server/market" "github.com/jcelliott/turnpike" ) var poloInss = []int{ markinfo.BTCCNY, markinfo.ETHCNY, markinfo.ETCCNY, } // PoloDS实现了dataSource接口, 并对polo的历史数据和实时数据保存 type PoloDS struct { *DSBase conf *DsConf client *turnpike.Client lastTime time.Time usdcny float64 } func init() { drivers[Polo] = newPoloDS } func newPoloDS(conf *DsConf) (DataSource, error) { pds := &PoloDS{ DSBase: NewDsBase(conf), conf: conf, } pds.insMap = poloInsMap() var err error pds.usdcny, err = getUsdCny() if err != nil || pds.usdcny < 1 || pds.usdcny > 100 { return nil, err } else { log.Println("usdcny:", pds.usdcny) } return pds, nil } func (pds *PoloDS) Name() string { return Polo } func (pds *PoloDS) Run() { log.Println("PoloDS.Run") go func() { var err error for { time.Sleep(time.Second * 10) pds.usdcny, err = getUsdCny() if err != nil { log.Println(err) } else { log.Println("usdcny:", pds.usdcny) } } }() err := pds.connect() if err != nil { log.Println("polods run:", err) } else { pds.lastTime = time.Now() } go pds.checkConnection() } func (pds *PoloDS) checkConnection() { for { time.Sleep(1 * time.Minute) if time.Now().Sub(pds.lastTime) >= 2*time.Minute { err := pds.connect() if err != nil { log.Println(err) } else { pds.lastTime = time.Now() } } } } func (pds *PoloDS) process(args []interface{}, kwargs map[string]interface{}) { pds.lastTime = time.Now() var hrLowStr, hrHighStr, lastStr, currencyPairStr, lowestAskStr, highestBidStr, baseVolumeStr, quoteVolumeStr string for i, v := range args { s, ok := v.([]byte) if ok { switch i { case 0: currencyPairStr = string(s) case 1: lastStr = string(s) case 2: lowestAskStr = string(s) case 3: highestBidStr = string(s) //case 4: //percentChangeStr = string(s) case 5: baseVolumeStr = string(s) case 6: quoteVolumeStr = string(s) //case 7: //isFrozenStr = string(s) case 8: hrHighStr = string(s) case 9: hrLowStr = string(s) } if (currencyPairStr == "USDT_BTC" || currencyPairStr == "USDT_ETH" || currencyPairStr == "USDT_ETC") && len(lowestAskStr) > 0 && len(highestBidStr) > 0 { mk := &Market{} mk.Type = IntPolo if currencyPairStr == "USDT_BTC" { mk.InsId = markinfo.BTCCNY } if currencyPairStr == "USDT_ETH" { mk.InsId = markinfo.ETHCNY } if currencyPairStr == "USDT_ETC" { mk.InsId = markinfo.ETCCNY } now := time.Now() mk.Timestamp = now.Unix()*int64(1000) + int64(now.Nanosecond()/1000000) ask64, _ := strconv.ParseFloat(lowestAskStr, 64) ask64 *= pds.usdcny bid64, _ := strconv.ParseFloat(highestBidStr, 64) bid64 *= pds.usdcny basev, _ := strconv.ParseFloat(baseVolumeStr, 64) quotev, _ := strconv.ParseFloat(quoteVolumeStr, 64) last, _ := strconv.ParseFloat(lastStr, 64) last *= pds.usdcny high, _ := strconv.ParseFloat(hrHighStr, 64) high *= pds.usdcny low, _ := strconv.ParseFloat(hrLowStr, 64) low *= pds.usdcny var ask, bid PP ask[0] = ask64 ask[1] = basev bid[0] = bid64 bid[1] = basev mk.Asks = append(mk.Asks, ask) mk.Bids = append(mk.Bids, bid) mk.High = high mk.LastPrice = last mk.Low = low mk.AllVolume = quotev pds.Save(mk) } } } } func (pds *PoloDS) connect() (err error) { if pds.client != nil { pds.client.Close() } for i := 0; i < 10; i++ { //pds.client, err = turnpike.NewWebsocketClient(turnpike.MSGPACK, "wss://api.poloniex.com", nil) pds.client, err = turnpike.NewWebsocketClient(turnpike.MSGPACK, "wss://api.poloniex.com") if err == nil { break } time.Sleep(1 * time.Minute) } if err != nil { log.Println("NewWebsocketClient", err) return err } _, err = pds.client.JoinRealm("realm1", nil) if err != nil { log.Println("JoinRealm", err) return err } err = pds.client.Subscribe("ticker", pds.process) if err != nil { log.Println("Subscribe", err) return err } return nil } func poloInsMap() map[int64]*Instrument { insMap := make(map[int64]*Instrument) for _, id := range poloInss { x, _ := markinfo.SymbolName(id) u, _ := markinfo.SymbolUint(x) ins := &Instrument{ Id: int64(id), Name: x, ExId: market.Polo, Type: market.Btcs, PriceInc: u, StartTime: time.Now().Unix() * 1000, } insMap[int64(id)] = ins } return insMap } func getUsdCny() (float64, error) { var price float64 url := "http://hq.sinajs.cn/rn=1488788247745list=fx_susdcny" response, err := http.Get(url) if err != nil { return price, err } defer response.Body.Close() body, err := ioutil.ReadAll(response.Body) if err != nil { return price, err } strs := strings.Split(string(body), ",") if len(strs) > 1 { price, err = strconv.ParseFloat(strs[1], 32) if err != nil { return price, err } } else { return price, errors.New("invalid data") } return price, nil }