123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package tick
- /*
- #include <string.h>
- */
- import "C"
- // 本文件实现okcoin数据源接口, 实时数据和历史数据的获取和保存
- import (
- "encoding/json"
- "log"
- "strconv"
- "time"
- "tickserver/markinfo"
- "tickserver/server/market"
- )
- const (
- ClientCount = 3
- )
- var btcInss = []int{
- markinfo.BTCUSD,
- markinfo.BTCCNY,
- markinfo.BTCFUSD,
- }
- type BTCTickerWS struct {
- Buy float64 `json:"buy"`
- High float64 `json:"high"`
- Last string `json:"last"`
- Low float64 `json:"low"`
- Sell float64 `json:"sell"`
- Timestamp string `json:"timestamp"`
- Vol string `json:"vol"`
- }
- type BTCTickWS struct {
- Channel string `json:"channel"`
- Data BTCTickerWS `json:"data"`
- }
- type BTCTickerWSFirst struct {
- Buy string `json:"buy"`
- High string `json:"high"`
- Last string `json:"last"`
- Low string `json:"low"`
- Sell string `json:"sell"`
- Timestamp string `json:"timestamp"`
- Vol string `json:"vol"`
- }
- type BTCTickWSFirst struct {
- Channel string `json:"channel"`
- Data BTCTickerWSFirst `json:"data"`
- }
- type BTCFTickerWS struct {
- Buy float64 `json:"buy"`
- Contract_id string `json:"contract_id"`
- High float64 `json:"high"`
- Hold_amount float64 `json:"hold_amount"`
- Last string `json:"last"`
- Low float64 `json:"low"`
- Sell float64 `json:"sell"`
- UnitAmount float64 `json:"unitAmount"`
- Vol string `json:"vol"`
- }
- type BTCFTickWS struct {
- Channel string `json:"channel"`
- Data BTCFTickerWS `json:"data"`
- }
- // BtcDS实现了dataSource接口, 并对btc的历史数据和实时数据保存
- type BtcDS struct {
- *DSBase
- conf *DsConf
- wsclient [ClientCount]*OKClientWrapper
- }
- func init() {
- drivers[Btc] = newBtcDS
- }
- func newBtcDS(conf *DsConf) (DataSource, error) {
- bds := &BtcDS{
- DSBase: NewDsBase(conf),
- conf: conf,
- }
- bds.insMap = btcInsMap()
- return bds, nil
- }
- func (bds *BtcDS) Name() string {
- return Btc
- }
- func (bds *BtcDS) Run() {
- log.Println("BtcDS.Run")
- var err error
- /*bds.wsclient[0], err = NewOKClientWrapper("http://localhost/", "wss://real.okcoin.com:10440/websocket/okcoinapi", BTC_REAL, markinfo.BTCUSD)
- if err != nil {
- log.Println("NewOKClientWrapper", err)
- }
- time.Sleep(time.Second * 3)
- err = bds.wsclient[0].Emit("ok_btcusd_ticker")
- if err != nil {
- log.Println("Emit", err)
- }
- go bds.ProcessData(0)*/
- bds.wsclient[0], err = NewOKClientWrapper("http://localhost/", "wss://real.okcoin.cn:10440/websocket/okcoinapi", BTC_REAL, markinfo.BTCCNY)
- if err != nil {
- log.Println("NewOKClientWrapper", err)
- }
- time.Sleep(time.Second * 3)
- err = bds.wsclient[0].Emit("ok_btccny_ticker")
- if err != nil {
- log.Println("Emit", err)
- }
- go bds.ProcessData(0)
- /*bds.wsclient[2], err = NewOKClientWrapper("http://localhost/", "wss://real.okcoin.com:10440/websocket/okcoinapi", BTC_F_REAL, markinfo.BTCFUSD)
- if err != nil {
- log.Println("NewOKClientWrapper", err)
- }
- time.Sleep(time.Second * 3)
- err = bds.wsclient[2].Emit("ok_btcusd_future_ticker_this_week")
- if err != nil {
- log.Println("Emit", err)
- }
- go bds.ProcessData(2)*/
- }
- func (bds *BtcDS) ProcessData(index int) {
- for {
- err := bds.wsclient[index].okclient.RevString(&bds.wsclient[index].datastr)
- if err == nil {
- if bds.wsclient[index].datastr == "{\"event\":\"pong\"}" {
- bds.wsclient[index].lastsec = time.Now().Unix()
- continue
- }
- switch bds.wsclient[index].datatyp {
- case BTC_REAL:
- err = bds.parseBTCReal(index)
- if err != nil {
- log.Println("parseBTCReal", bds.wsclient[index].datastr, err)
- }
- case BTC_F_REAL:
- err = bds.parseBTCFReal(index)
- if err != nil {
- log.Println("parseBTCFReal", bds.wsclient[index].datastr, err)
- }
- default:
- log.Println("data type not supported:", bds.wsclient[index].datatyp)
- }
- } else {
- log.Println(bds.wsclient[index].symbol, "RevString", err)
- //bds.wsclient[index].okclient.Close()
- time.Sleep(time.Second * 2)
- //bds.wsclient[index].okclient.Connect()
- //time.Sleep(time.Second * 3)
- //bds.wsclient[index].ReEmit()
- }
- }
- }
- func (bds *BtcDS) parseBTCFReal(index int) error {
- var btcfticks []BTCFTickWS
- err := json.Unmarshal([]byte(bds.wsclient[index].datastr), &btcfticks)
- if err != nil {
- return err
- }
- for _, btcftick := range btcfticks {
- mk := &Market{}
- mk.Type = IntBtc
- mk.InsId = markinfo.BTCFUSD
- mk.Timestamp = time.Now().Unix() * 1000
- var ask, bid PP
- ask[0] = btcftick.Data.Sell
- ask[1] = btcftick.Data.Hold_amount
- bid[0] = btcftick.Data.Buy
- bid[1] = btcftick.Data.Hold_amount
- mk.Asks = append(mk.Asks, ask)
- mk.Bids = append(mk.Bids, bid)
- mk.High = btcftick.Data.High
- mk.LastPrice, _ = strconv.ParseFloat(btcftick.Data.Last, 32)
- mk.Low = btcftick.Data.Low
- mk.AllAmount = btcftick.Data.UnitAmount
- mk.AllVolume, _ = strconv.ParseFloat(btcftick.Data.Vol, 32)
- bds.Save(mk)
- }
- return nil
- }
- func (bds *BtcDS) parseBTCReal(index int) error {
- var btctickwss []BTCTickWS
- err := json.Unmarshal([]byte(bds.wsclient[index].datastr), &btctickwss)
- if err != nil {
- var btctickwssfirst []BTCTickWSFirst
- err = json.Unmarshal([]byte(bds.wsclient[index].datastr), &btctickwssfirst)
- if err != nil {
- log.Println(bds.wsclient[index].datastr)
- return err
- }
- for _, btctickws := range btctickwssfirst {
- mk := &Market{}
- mk.Type = IntBtc
- mk.InsId = int64(bds.wsclient[index].symbol)
- mk.Timestamp, _ = strconv.ParseInt(btctickws.Data.Timestamp, 10, 64)
- if mk.Timestamp == 0 {
- return nil //丢弃非行情数据
- }
- var ask, bid PP
- ask[0], _ = strconv.ParseFloat(btctickws.Data.Sell, 32)
- bid[0], _ = strconv.ParseFloat(btctickws.Data.Buy, 32)
- mk.Asks = append(mk.Asks, ask)
- mk.Bids = append(mk.Bids, bid)
- mk.High, _ = strconv.ParseFloat(btctickws.Data.High, 32)
- mk.LastPrice, _ = strconv.ParseFloat(btctickws.Data.Last, 32)
- mk.Low, _ = strconv.ParseFloat(btctickws.Data.Low, 32)
- mk.LastVolume, _ = strconv.ParseFloat(btctickws.Data.Vol, 32)
- if mk.LastPrice != 0 {
- bds.Save(mk)
- }
- }
- return nil
- }
- for _, btctickws := range btctickwss {
- mk := &Market{}
- mk.Type = IntBtc
- mk.InsId = int64(bds.wsclient[index].symbol)
- mk.Timestamp, _ = strconv.ParseInt(btctickws.Data.Timestamp, 10, 64)
- if mk.Timestamp == 0 {
- return nil //丢弃非行情数据
- }
- var ask, bid PP
- ask[0] = btctickws.Data.Sell
- bid[0] = btctickws.Data.Buy
- mk.Asks = append(mk.Asks, ask)
- mk.Bids = append(mk.Bids, bid)
- mk.High = btctickws.Data.High
- mk.LastPrice, _ = strconv.ParseFloat(btctickws.Data.Last, 32)
- mk.Low = btctickws.Data.Low
- mk.LastVolume, _ = strconv.ParseFloat(btctickws.Data.Vol, 32)
- if mk.LastPrice != 0 {
- bds.Save(mk)
- }
- }
- return nil
- }
- func btcInsMap() map[int64]*Instrument {
- insMap := make(map[int64]*Instrument)
- for _, id := range btcInss {
- x, _ := markinfo.SymbolName(id)
- u, _ := markinfo.SymbolUint(x)
- ins := &Instrument{
- Id: int64(id),
- Name: x,
- ExId: market.Btc,
- Type: market.Btcs,
- PriceInc: u,
- StartTime: time.Now().Unix() * 1000,
- }
- insMap[int64(id)] = ins
- }
- return insMap
- }
|