// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package tick /* #include */ import "C" // 本文件实现okcoin数据源接口, 实时数据和历史数据的获取和保存 import ( "encoding/json" "log" "strconv" "time" "tickserver/markinfo" "tickserver/server/market" ) const ( ClientCount = 3 ) var btcInss = []int{ markinfo.BTCUSD, markinfo.BTCCNY, markinfo.BTCFUSD, } type BTCTickerWS struct { Buy float64 `json:"buy"` High float64 `json:"high"` Last string `json:"last"` Low float64 `json:"low"` Sell float64 `json:"sell"` Timestamp string `json:"timestamp"` Vol string `json:"vol"` } type BTCTickWS struct { Channel string `json:"channel"` Data BTCTickerWS `json:"data"` } type BTCTickerWSFirst struct { Buy string `json:"buy"` High string `json:"high"` Last string `json:"last"` Low string `json:"low"` Sell string `json:"sell"` Timestamp string `json:"timestamp"` Vol string `json:"vol"` } type BTCTickWSFirst struct { Channel string `json:"channel"` Data BTCTickerWSFirst `json:"data"` } type BTCFTickerWS struct { Buy float64 `json:"buy"` Contract_id string `json:"contract_id"` High float64 `json:"high"` Hold_amount float64 `json:"hold_amount"` Last string `json:"last"` Low float64 `json:"low"` Sell float64 `json:"sell"` UnitAmount float64 `json:"unitAmount"` Vol string `json:"vol"` } type BTCFTickWS struct { Channel string `json:"channel"` Data BTCFTickerWS `json:"data"` } // BtcDS实现了dataSource接口, 并对btc的历史数据和实时数据保存 type BtcDS struct { *DSBase conf *DsConf wsclient [ClientCount]*OKClientWrapper } func init() { drivers[Btc] = newBtcDS } func newBtcDS(conf *DsConf) (DataSource, error) { bds := &BtcDS{ DSBase: NewDsBase(conf), conf: conf, } bds.insMap = btcInsMap() return bds, nil } func (bds *BtcDS) Name() string { return Btc } func (bds *BtcDS) Run() { log.Println("BtcDS.Run") var err error /*bds.wsclient[0], err = NewOKClientWrapper("http://localhost/", "wss://real.okcoin.com:10440/websocket/okcoinapi", BTC_REAL, markinfo.BTCUSD) if err != nil { log.Println("NewOKClientWrapper", err) } time.Sleep(time.Second * 3) err = bds.wsclient[0].Emit("ok_btcusd_ticker") if err != nil { log.Println("Emit", err) } go bds.ProcessData(0)*/ bds.wsclient[0], err = NewOKClientWrapper("http://localhost/", "wss://real.okcoin.cn:10440/websocket/okcoinapi", BTC_REAL, markinfo.BTCCNY) if err != nil { log.Println("NewOKClientWrapper", err) } time.Sleep(time.Second * 3) err = bds.wsclient[0].Emit("ok_btccny_ticker") if err != nil { log.Println("Emit", err) } go bds.ProcessData(0) /*bds.wsclient[2], err = NewOKClientWrapper("http://localhost/", "wss://real.okcoin.com:10440/websocket/okcoinapi", BTC_F_REAL, markinfo.BTCFUSD) if err != nil { log.Println("NewOKClientWrapper", err) } time.Sleep(time.Second * 3) err = bds.wsclient[2].Emit("ok_btcusd_future_ticker_this_week") if err != nil { log.Println("Emit", err) } go bds.ProcessData(2)*/ } func (bds *BtcDS) ProcessData(index int) { for { err := bds.wsclient[index].okclient.RevString(&bds.wsclient[index].datastr) if err == nil { if bds.wsclient[index].datastr == "{\"event\":\"pong\"}" { bds.wsclient[index].lastsec = time.Now().Unix() continue } switch bds.wsclient[index].datatyp { case BTC_REAL: err = bds.parseBTCReal(index) if err != nil { log.Println("parseBTCReal", bds.wsclient[index].datastr, err) } case BTC_F_REAL: err = bds.parseBTCFReal(index) if err != nil { log.Println("parseBTCFReal", bds.wsclient[index].datastr, err) } default: log.Println("data type not supported:", bds.wsclient[index].datatyp) } } else { log.Println(bds.wsclient[index].symbol, "RevString", err) //bds.wsclient[index].okclient.Close() time.Sleep(time.Second * 2) //bds.wsclient[index].okclient.Connect() //time.Sleep(time.Second * 3) //bds.wsclient[index].ReEmit() } } } func (bds *BtcDS) parseBTCFReal(index int) error { var btcfticks []BTCFTickWS err := json.Unmarshal([]byte(bds.wsclient[index].datastr), &btcfticks) if err != nil { return err } for _, btcftick := range btcfticks { mk := &Market{} mk.Type = IntBtc mk.InsId = markinfo.BTCFUSD mk.Timestamp = time.Now().Unix() * 1000 var ask, bid PP ask[0] = btcftick.Data.Sell ask[1] = btcftick.Data.Hold_amount bid[0] = btcftick.Data.Buy bid[1] = btcftick.Data.Hold_amount mk.Asks = append(mk.Asks, ask) mk.Bids = append(mk.Bids, bid) mk.High = btcftick.Data.High mk.LastPrice, _ = strconv.ParseFloat(btcftick.Data.Last, 32) mk.Low = btcftick.Data.Low mk.AllAmount = btcftick.Data.UnitAmount mk.AllVolume, _ = strconv.ParseFloat(btcftick.Data.Vol, 32) bds.Save(mk) } return nil } func (bds *BtcDS) parseBTCReal(index int) error { var btctickwss []BTCTickWS err := json.Unmarshal([]byte(bds.wsclient[index].datastr), &btctickwss) if err != nil { var btctickwssfirst []BTCTickWSFirst err = json.Unmarshal([]byte(bds.wsclient[index].datastr), &btctickwssfirst) if err != nil { log.Println(bds.wsclient[index].datastr) return err } for _, btctickws := range btctickwssfirst { mk := &Market{} mk.Type = IntBtc mk.InsId = int64(bds.wsclient[index].symbol) mk.Timestamp, _ = strconv.ParseInt(btctickws.Data.Timestamp, 10, 64) if mk.Timestamp == 0 { return nil //丢弃非行情数据 } var ask, bid PP ask[0], _ = strconv.ParseFloat(btctickws.Data.Sell, 32) bid[0], _ = strconv.ParseFloat(btctickws.Data.Buy, 32) mk.Asks = append(mk.Asks, ask) mk.Bids = append(mk.Bids, bid) mk.High, _ = strconv.ParseFloat(btctickws.Data.High, 32) mk.LastPrice, _ = strconv.ParseFloat(btctickws.Data.Last, 32) mk.Low, _ = strconv.ParseFloat(btctickws.Data.Low, 32) mk.LastVolume, _ = strconv.ParseFloat(btctickws.Data.Vol, 32) if mk.LastPrice != 0 { bds.Save(mk) } } return nil } for _, btctickws := range btctickwss { mk := &Market{} mk.Type = IntBtc mk.InsId = int64(bds.wsclient[index].symbol) mk.Timestamp, _ = strconv.ParseInt(btctickws.Data.Timestamp, 10, 64) if mk.Timestamp == 0 { return nil //丢弃非行情数据 } var ask, bid PP ask[0] = btctickws.Data.Sell bid[0] = btctickws.Data.Buy mk.Asks = append(mk.Asks, ask) mk.Bids = append(mk.Bids, bid) mk.High = btctickws.Data.High mk.LastPrice, _ = strconv.ParseFloat(btctickws.Data.Last, 32) mk.Low = btctickws.Data.Low mk.LastVolume, _ = strconv.ParseFloat(btctickws.Data.Vol, 32) if mk.LastPrice != 0 { bds.Save(mk) } } return nil } func btcInsMap() map[int64]*Instrument { insMap := make(map[int64]*Instrument) for _, id := range btcInss { x, _ := markinfo.SymbolName(id) u, _ := markinfo.SymbolUint(x) ins := &Instrument{ Id: int64(id), Name: x, ExId: market.Btc, Type: market.Btcs, PriceInc: u, StartTime: time.Now().Unix() * 1000, } insMap[int64(id)] = ins } return insMap }