// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package tick // 本文件实现bityuan数据源接口, 实时数据和历史数据的获取和保存 import ( "database/sql" "encoding/json" "errors" "fmt" "io/ioutil" //"log" "net/http" "strconv" "time" "tickserver/markinfo" "tickserver/server/market" _ "github.com/go-sql-driver/mysql" ) var btyInss = []int{ //markinfo.SCCNY, //markinfo.BTCCNY, //markinfo.BTYCNY, //markinfo.ETHCNY, //markinfo.ETCCNY, //markinfo.ZECCNY, //markinfo.BTSCNY, //markinfo.LTCCNY, //markinfo.BCCCNY, //markinfo.NYCCCNY, //markinfo.WTCCNY, markinfo.ETHBTC, markinfo.ETCBTC, markinfo.ZECBTC, markinfo.LTCBTC, markinfo.BCCBTC, markinfo.ETHUSDT, markinfo.ETCUSDT, markinfo.ZECUSDT, markinfo.LTCUSDT, markinfo.BCCUSDT, markinfo.BTCUSDT, markinfo.BTYUSDT, markinfo.BTSUSDT, markinfo.SCUSDT, markinfo.BTYBTC, markinfo.BTSBTC, markinfo.SCBTC, markinfo.YCCUSDT, markinfo.BTCSUSDT, markinfo.DCRUSDT, } var btyTable = "inss_lastoffsets" var last_offset int64 var bty_reader *TickRead // BtyDS实现了dataSource接口, 并对bityuan的历史数据和实时数据保存 type BtyDS struct { *DSBase conf *DsConf lastOffsets []int64 } func init() { drivers[Bty] = newBtyDS } func newBtyDS(conf *DsConf) (DataSource, error) { err := checkBtyTable() if err != nil { return nil, err } bds := &BtyDS{ DSBase: NewDsBase(conf), conf: conf, } bds.insMap = btyInsMap() bds.lastOffsets = make([]int64, len(btyInss)) for i := 0; i < len(btyInss); i++ { symbol, _ := markinfo.SymbolName(btyInss[i]) lastOffset, err := getLastOffset(Bty, symbol) if err != nil { return nil, err } bds.lastOffsets[i] = lastOffset } bty_reader = NewTickRead() return bds, nil } func (bds *BtyDS) Name() string { return Bty } func (bds *BtyDS) Run() { //log.Println("BtyDS.Run") for i, offset := range bds.lastOffsets { bds.getData(int64(btyInss[i]), offset) } for { mk, err := bty_reader.Read() if err != nil { fmt.Println(err) } //log.Println("save fuck") bds.Save(mk) } } func btyInsMap() map[int64]*Instrument { insMap := make(map[int64]*Instrument) for _, id := range btyInss { x, _ := markinfo.SymbolName(id) u, _ := markinfo.SymbolUint(x) ins := &Instrument{ Id: int64(id), Name: x, ExId: Bty, Type: market.Btcs, PriceInc: u, StartTime: time.Now().Unix() * 1000, } insMap[int64(id)] = ins } return insMap } func (bds *BtyDS) getData(instrumentId, offset int64) { //var reader *TickRead if instrumentId == markinfo.BTYCNY { GetMacTick(instrumentId, offset) } else { GetEthTick(instrumentId, offset) } //if instrumentId == markinfo.BTCCNY { //GetBtcTick(instrumentId, offset) //} } func GetEthTickbyPage(instrumentId, offset int64, cb func(*Market)) (int64, error) { symbol, _ := markinfo.SymbolName(int(instrumentId)) url := "http://47.74.9.155:45656/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100" //url := "http://121.196.205.182:45656/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100" response, err := http.Get(url) if err != nil { //log.Println("f1", url, err) return offset, err } defer response.Body.Close() body, err := ioutil.ReadAll(response.Body) if err != nil { //log.Println("f2", url, err) return offset, err } var data map[string]interface{} err = json.Unmarshal(body, &data) if err != nil { //log.Println("f3", url, err) return offset, err } //fmt.Println(url) dataticks, ok := data["result"].([]interface{}) if !ok { //log.Println("GetEthTickbyPage1", url, string(body)) return offset, nil } total := len(dataticks) if total == 0 { //log.Println("GetEthTickbyPage2", url, string(body)) } //offset += int64(total) for _, tmp := range dataticks { ti, id, err := toEthMK(tmp.(map[string]interface{})) if err != nil { fmt.Println("toMac=", err) continue } offset = id cb(ti) } return offset, nil } func GetBtcTickbyPage(offset int64, cb func(*Market)) (int64, error) { url := "http://32.33.cn:9902/tender/default/kline?symbol=BTC&offset=" + fmt.Sprintf("%d", offset) + "&limit=100" response, err := http.Get(url) if err != nil { //log.Println("k1", url, err) return offset, err } defer response.Body.Close() body, err := ioutil.ReadAll(response.Body) if err != nil { //log.Println("k2", url, err) return offset, err } var data map[string]interface{} err = json.Unmarshal(body, &data) if err != nil { //log.Println("k3", url, err) return offset, err } //fmt.Println(url) dataticks, ok := data["result"].([]interface{}) if !ok { //log.Println("GetBtcTickbyPage1", url, string(body)) return offset, nil } total := len(dataticks) if total == 0 { //log.Println("GetBtcTickbyPage2", url, string(body)) } offset += int64(total) for _, tmp := range dataticks { ti, err := toBtcMK(tmp.(map[string]interface{})) if err != nil { fmt.Println("toBtc=", err) continue } cb(ti) } return offset, nil } func toEthMK(data map[string]interface{}) (*Market, int64, error) { mk := &Market{} mk.Type = IntBty symbolId := int32(data["symbolid"].(float64)) switch symbolId { case 327681: mk.InsId = markinfo.ETCCNY case 131073: mk.InsId = markinfo.BTCCNY case 262145: mk.InsId = markinfo.ETHCNY case 458753: mk.InsId = markinfo.SCCNY case 524289: mk.InsId = markinfo.ZECCNY case 589825: mk.InsId = markinfo.BTSCNY case 655361: mk.InsId = markinfo.LTCCNY case 720897: mk.InsId = markinfo.BCCCNY case 851969: mk.InsId = markinfo.NYCCCNY case 917505: mk.InsId = markinfo.WTCCNY case 262146: mk.InsId = markinfo.ETHBTC case 327682: mk.InsId = markinfo.ETCBTC case 524290: mk.InsId = markinfo.ZECBTC case 655362: mk.InsId = markinfo.LTCBTC case 720898: mk.InsId = markinfo.BCCBTC case 262159: mk.InsId = markinfo.ETHUSDT case 327695: mk.InsId = markinfo.ETCUSDT case 524303: mk.InsId = markinfo.ZECUSDT case 655375: mk.InsId = markinfo.LTCUSDT case 720911: mk.InsId = markinfo.BCCUSDT case 131087: mk.InsId = markinfo.BTCUSDT case 196623: mk.InsId = markinfo.BTYUSDT case 589839: mk.InsId = markinfo.BTSUSDT case 458767: mk.InsId = markinfo.SCUSDT case 196610: mk.InsId = markinfo.BTYBTC case 589826: mk.InsId = markinfo.BTSBTC case 458754: mk.InsId = markinfo.SCBTC case 786447: mk.InsId = markinfo.YCCUSDT case 1048591: mk.InsId = markinfo.BTCSUSDT case 1114127: mk.InsId = markinfo.DCRUSDT default: return nil, 0, errors.New("invalid symbolid") } mk.Timestamp = parseKTime(data["time"].(string)) price := data["price"].(float64) volume, _ := data["quantity"].(float64) id := int64(data["id"].(float64)) price /= (10000 * 10000) volume /= 100000000 mk.Close = price mk.Open = price mk.High = price mk.Low = price mk.AllAmount = volume mk.AllVolume = volume mk.LastPrice = price mk.LastVolume = volume var ask, bid PP ask[0] = price ask[1] = volume bid[0] = price bid[1] = volume mk.Asks = append(mk.Asks, ask) mk.Bids = append(mk.Bids, bid) return mk, id, nil } func parseKTime(timeStr string) int64 { var year, month, day, hour, minute, second int fmt.Sscanf(timeStr, "%04d-%02d-%02d %02d:%02d:%02d", &year, &month, &day, &hour, &minute, &second) t := time.Date(year, time.Month(month), day, hour, minute, second, 0, time.Local) return t.Unix() * 1000 } func toBtcMK(data map[string]interface{}) (*Market, error) { mk := &Market{} mk.Type = IntBty mk.InsId = markinfo.BTCCNY tick_time := data["time"] f_tick_time, _ := strconv.ParseFloat(tick_time.(string), 32) mk.Timestamp = int64(f_tick_time / (1e6)) price := data["price"].(float64) volume := data["quantity"].(float64) price /= (100 * 10000) //volume /= 100 mk.Close = price mk.Open = price mk.High = price mk.Low = price mk.AllAmount = volume mk.AllVolume = volume mk.LastPrice = price mk.LastVolume = volume var ask, bid PP ask[0] = price ask[1] = volume bid[0] = price bid[1] = volume mk.Asks = append(mk.Asks, ask) mk.Bids = append(mk.Bids, bid) return mk, nil } type TickRead struct { ch chan *Market err chan error } func NewTickRead() *TickRead { ch := make(chan *Market, 1024) errch := make(chan error) reader := &TickRead{} reader.ch = ch reader.err = errch return reader } func (tr *TickRead) Read() (*Market, error) { tick := <-tr.ch if tick == nil { return nil, <-tr.err } return tick, nil } func GetEthTick(instrumentId, offset int64) error { //reader := NewTickRead() lasttime := int64(0) go func() { for { offset_next, err := GetEthTickbyPage(instrumentId, offset, func(mk *Market) { if mk != nil { if mk.Timestamp >= lasttime { bty_reader.ch <- mk lasttime = mk.Timestamp } } }) time.Sleep(time.Second) if err != nil { //log.Println("GetEthTick", err, offset) continue } if offset < offset_next { symbol, _ := markinfo.SymbolName(int(instrumentId)) err := updateLastoffset(Bty, symbol, offset_next) if err != nil { continue } //time.Sleep(time.Duration((offset_next-offset)/100) * time.Second) offset = offset_next } //if !isEndPage { //continue //} } }() return nil } func GetBtcTick(instrumentId, offset int64) error { //reader := NewTickRead() lasttime := int64(0) go func() { for { //offset, _ = getLastOffset(Bty, "BTCCNY") offset_next, err := GetBtcTickbyPage(offset, func(mk *Market) { if mk != nil { if mk.Timestamp >= lasttime { bty_reader.ch <- mk lasttime = mk.Timestamp } } }) time.Sleep(time.Second) if err != nil { //log.Println("GetBtcTick", err, offset) continue } if offset < offset_next { symbol, _ := markinfo.SymbolName(int(instrumentId)) err := updateLastoffset(Bty, symbol, offset_next) if err != nil { continue } //time.Sleep(time.Duration((offset_next-offset)/100) * time.Second) offset = offset_next } //if !isEndPage { //continue //} } }() return nil } /*创建数据表*/ func checkBtyTable() error { sql := fmt.Sprintf("create table if not exists %s (ty varchar(10), insId varchar(20), lastOffset bigint)", btyTable) _, err := db.Exec(sql) if err != nil { return err } return nil } func getLastOffset(ty, insId string) (lastOffset int64, err error) { szSelectTable := "SELECT `lastOffset` FROM `" + btyTable + "` WHERE `ty` = '" + ty + "' AND `insId` = '" + insId + "';" row := db.QueryRow(szSelectTable) err = row.Scan(&lastOffset) if err == sql.ErrNoRows { q := fmt.Sprintf("INSERT INTO %s (ty, insId, lastOffset) values ('%s', '%s', '%d')", btyTable, ty, insId, lastOffset) _, err = db.Exec(q) if err != nil { fmt.Println("getLastOffset", err) return lastOffset, err } return lastOffset, nil } return lastOffset, err } func updateLastoffset(typ, insId string, lastOffset int64) error { //INSERT INTO q := fmt.Sprintf("UPDATE %s set lastOffset = '%d' where ty = '%s' and insId = '%s'", btyTable, lastOffset, typ, insId) _, err := db.Exec(q) if err != nil { fmt.Println("updateLastoffset", err) } return err } func GetMacTick(instrumentId, ntime int64) error { //reader := NewTickRead() go func() { for { err, _ntime, _ := GetMacTickbyPage(1, ntime, instrumentId, func(mk *Market) { if mk != nil { bty_reader.ch <- mk } }) time.Sleep(time.Second) //fmt.Println(err, ntime, isEndPage) if err != nil { //log.Println("GetMacTick", err) continue } if ntime < _ntime { symbol, _ := markinfo.SymbolName(int(instrumentId)) err := updateLastoffset(Bty, symbol, _ntime) if err != nil { continue } //time.Sleep(time.Duration(total/100) * time.Second) ntime = _ntime } //if !isEndPage { //continue //} } }() return nil } func GetMacTickbyPage(page int, time2 int64, instrumentId int64, cb func(*Market)) (error, int64, int64) { url := "https://zc.bityuan.com/site/tick2?count=500&Tick_page=" + fmt.Sprintf("%d", page) + "&Tick_sort=match_id.asc&time=" + fmt.Sprintf("%d", time2) //fmt.Println(url) response, err := http.Get(url) if err != nil { //fmt.Println("macoin", err) return err, 0, 0 } defer response.Body.Close() body, err := ioutil.ReadAll(response.Body) if err != nil { //fmt.Println(err) return err, 0, 0 } var data map[string]interface{} err = json.Unmarshal(body, &data) if err != nil { //fmt.Println("json Unmarshal", err) return err, 0, 0 } total, err := strconv.ParseInt(data["total"].(string), 10, 32) if err != nil { //fmt.Println("ParseInt total ", err) return err, 0, 0 } dataticks := data["data"].([]interface{}) ntime := int64(0) for _, tmp := range dataticks { mk, err := toMacMK(tmp.(map[string]interface{}), instrumentId) if mk == nil || err != nil { continue } ntime = (mk.Timestamp / 1000) + 1 if mk.LastPrice == 0.0 || mk.LastVolume == 0.0 { continue } cb(mk) } return nil, ntime, total } func toMacMK(data map[string]interface{}, instrumentId int64) (*Market, error) { mk := &Market{} mk.InsId = instrumentId mk.Type = IntBty timestamp, err := strconv.ParseInt(data["time"].(string), 10, 32) if err != nil { return nil, err } match_id, err := strconv.ParseInt(data["match_id"].(string), 10, 32) if err != nil { return nil, err } if last_offset > int64(match_id) { return nil, nil } last_offset = int64(match_id) mk.Timestamp = timestamp * 1000 // value var datavalue map[string]interface{} var datasell map[string]interface{} err = json.Unmarshal([]byte(data["value"].(string)), &datavalue) if err != nil { return nil, err } if len(datavalue["sell"].([]interface{})) == 0 { return mk, nil } datasell = (datavalue["sell"].([]interface{})[0]).(map[string]interface{}) price, err := strconv.ParseFloat(datasell["price"].(string), 64) if err != nil { return nil, err } price /= 1000 volume, err := strconv.ParseInt(data["amount"].(string), 10, 64) if err != nil { return nil, err } if volume < 0 { volume = -volume } mk.AllAmount = float64(volume) mk.AllVolume = float64(volume) mk.Close = price mk.Open = price mk.High = price mk.Low = price mk.LastPrice = price mk.LastVolume = float64(volume) var ask, bid PP ask[0] = price ask[1] = float64(volume) bid[0] = price bid[1] = float64(volume) mk.Asks = append(mk.Asks, ask) mk.Bids = append(mk.Bids, bid) return mk, nil }