// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package tick // 本文件实现bityuan数据源接口, 实时数据和历史数据的获取和保存 import ( "database/sql" "encoding/json" //"errors" "fmt" "io/ioutil" //"log" //"bytes" "net/http" "strconv" "time" "tickserver/markinfo" "tickserver/server/market" _ "github.com/go-sql-driver/mysql" ) var btyInss = []int{ markinfo.ETHBTC, markinfo.ETCBTC, markinfo.ZECBTC, markinfo.LTCBTC, markinfo.BCCBTC, markinfo.ETHUSDT, markinfo.ETCUSDT, markinfo.ZECUSDT, markinfo.LTCUSDT, markinfo.BCCUSDT, markinfo.BTCUSDT, markinfo.BTYUSDT, markinfo.BTSUSDT, markinfo.SCUSDT, markinfo.DCRUSDT, markinfo.BNTUSDT, markinfo.BNTBTC, markinfo.SCTCUSDT, markinfo.SCTCBTC, markinfo.YCCUSDT, markinfo.YCCBTC, markinfo.YCCETH, markinfo.JBUSDT, markinfo.JBBTC, markinfo.JBETH, markinfo.OPTCUSDT, markinfo.OPTCBTC, markinfo.OPTCETH, markinfo.BTCETH, markinfo.BCCETH, markinfo.ZECETH, markinfo.ETCETH, markinfo.LTCETH, markinfo.STILT, markinfo.ITVBUSDT, markinfo.ITVBBTC, markinfo.ITVBETH, markinfo.BTYETH, markinfo.FHUSDT, markinfo.CWVUSDT, markinfo.FHETH, markinfo.TMCETH, markinfo.FANSUSDT, markinfo.FANSBTC, markinfo.FANSETH, markinfo.WTBWTC, markinfo.CNSUSDT, } var inss = map[int]string{ markinfo.ETHBTC: "ETH_BTC", markinfo.ETCBTC: "ETC_BTC", markinfo.ZECBTC: "ZEC_BTC", markinfo.LTCBTC: "LTC_BTC", markinfo.BCCBTC: "BCC_BTC", markinfo.ETHUSDT: "ETH_USDT", markinfo.ETCUSDT: "ETC_USDT", markinfo.ZECUSDT: "ZEC_USDT", markinfo.LTCUSDT: "LTC_USDT", markinfo.BCCUSDT: "BCC_USDT", markinfo.BTCUSDT: "BTC_USDT", markinfo.BTYUSDT: "BTY_USDT", markinfo.BTSUSDT: "BTS_USDT", markinfo.SCUSDT: "SC_USDT", markinfo.DCRUSDT: "DCR_USDT", markinfo.BNTUSDT: "BNT_USDT", markinfo.BNTBTC: "BNT_BTC", markinfo.SCTCUSDT: "SCTC_USDT", markinfo.SCTCBTC: "SCTC_BTC", markinfo.YCCUSDT: "YCC_USDT", markinfo.YCCBTC: "YCC_BTC", markinfo.YCCETH: "YCC_ETH", markinfo.JBUSDT: "JB_USDT", markinfo.JBBTC: "JB_BTC", markinfo.JBETH: "JB_ETH", markinfo.OPTCUSDT: "OPTC_USDT", markinfo.OPTCBTC: "OPTC_BTC", markinfo.OPTCETH: "OPTC_ETH", markinfo.BTCETH: "BTC_ETH", markinfo.BCCETH: "BCC_ETH", markinfo.ZECETH: "ZEC_ETH", markinfo.ETCETH: "ETC_ETH", markinfo.LTCETH: "LTC_ETH", markinfo.STILT: "ST_ILT", markinfo.ITVBUSDT: "ITVB_USDT", markinfo.ITVBBTC: "ITVB_BTC", markinfo.ITVBETH: "ITVB_ETH", markinfo.BTYETH: "BTY_ETH", markinfo.FHUSDT: "FH_USDT", markinfo.CWVUSDT: "CWV_USDT", markinfo.FHETH: "FH_ETH", markinfo.TMCETH: "TMC_ETH", markinfo.FANSUSDT: "FANS_USDT", markinfo.FANSBTC: "FANS_BTC", markinfo.FANSETH: "FANS_ETH", markinfo.WTBWTC: "WTB_WTC", markinfo.CNSUSDT: "CNS_USDT", } var btyTable = "inss_lastoffsets" var last_offset int64 var bty_reader *TickRead // BtyDS实现了dataSource接口, 并对bityuan的历史数据和实时数据保存 type BtyDS struct { *DSBase conf *DsConf lastOffsets []int64 } func init() { drivers[Bty] = newBtyDS } func newBtyDS(conf *DsConf) (DataSource, error) { err := checkBtyTable() if err != nil { return nil, err } bds := &BtyDS{ DSBase: NewDsBase(conf), conf: conf, } bds.insMap = btyInsMap() bds.lastOffsets = make([]int64, len(btyInss)) for i := 0; i < len(btyInss); i++ { symbol, _ := markinfo.SymbolName(btyInss[i]) lastOffset, err := getLastOffset(Bty, symbol) if err != nil { return nil, err } bds.lastOffsets[i] = lastOffset } bty_reader = NewTickRead() return bds, nil } func (bds *BtyDS) Name() string { return Bty } func (bds *BtyDS) Run() { //log.Println("BtyDS.Run") for i, offset := range bds.lastOffsets { bds.getData(int64(btyInss[i]), offset) } for { mk, err := bty_reader.Read() if err != nil { fmt.Println("1", err) } //log.Println("save fuck") bds.Save(mk) } } func btyInsMap() map[int64]*Instrument { insMap := make(map[int64]*Instrument) for _, id := range btyInss { x, _ := markinfo.SymbolName(id) u, _ := markinfo.SymbolUint(x) ins := &Instrument{ Id: int64(id), Name: x, ExId: Bty, Type: market.Btcs, PriceInc: u, StartTime: time.Now().Unix() * 1000, } insMap[int64(id)] = ins } return insMap } func (bds *BtyDS) getData(instrumentId, offset int64) { //var reader *TickRead if instrumentId == markinfo.BTYCNY { GetMacTick(instrumentId, offset) } else { GetEthTick(instrumentId, offset) } //if instrumentId == markinfo.BTCCNY { //GetBtcTick(instrumentId, offset) //} } func GetEthTickbyPage(instrumentId, offset int64, cb func(*Market)) (int64, error) { symbol, _ := markinfo.SymbolName(int(instrumentId)) //url := "http://122.224.124.250:10115/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100" //url := "http://10.0.1.5:45659/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100" url := "http://10.0.1.177:80/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100" response, err := http.Get(url) if err != nil { return offset, err } defer response.Body.Close() body, err := ioutil.ReadAll(response.Body) if err != nil { //fmt.Println("f2", url, err) return offset, err } var data map[string]interface{} err = json.Unmarshal(body, &data) if err != nil { //fmt.Println("f3", url, err) return offset, err } //fmt.Println(url) dataticks, ok := data["result"].([]interface{}) if !ok { //fmt.Println("GetEthTickbyPage1", req, string(body)) return offset, nil } total := len(dataticks) if total == 0 { //log.Println("GetEthTickbyPage2", url, string(body)) } //offset += int64(total) for _, tmp := range dataticks { ti, id, err := toEthMK(tmp.(map[string]interface{})) if err != nil { fmt.Println("toMac=", err) continue } ti.InsId = instrumentId offset = id cb(ti) } return offset, nil } /* func GetEthTickbyPage(instrumentId, offset int64, cb func(*Market)) (int64, error) { symbol := inss[int(instrumentId)] //url := "http://122.224.124.250:10115/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100" //url := "http://10.0.1.5:45659/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100" //response, err := http.Get(url) //if err != nil { //return offset, err //} params := make(map[string]interface{}) params["symbol"] = symbol params["offset"] = offset params["limit"] = 100 req := make(map[string]interface{}) req["jsonrpc"] = "2.0" req["method"] = "kline" req["id"] = 0 req["params"] = params bytesData, err := json.Marshal(req) if err != nil { //fmt.Println("2", err) return offset, err } reader := bytes.NewReader(bytesData) url := "http://47.91.198.174:45656" request, err := http.NewRequest("POST", url, reader) if err != nil { //fmt.Println("3", err) return offset, err } request.Header.Set("Content-Type", "application/json;charset=UTF-8") client := http.Client{} response, err := client.Do(request) if err != nil { //fmt.Println("4", err) return offset, err } defer response.Body.Close() body, err := ioutil.ReadAll(response.Body) if err != nil { //fmt.Println("f2", url, err) return offset, err } var data map[string]interface{} err = json.Unmarshal(body, &data) if err != nil { //fmt.Println("f3", url, err) return offset, err } //fmt.Println(url) dataticks, ok := data["result"].([]interface{}) if !ok { //fmt.Println("GetEthTickbyPage1", req, string(body)) return offset, nil } total := len(dataticks) if total == 0 { //log.Println("GetEthTickbyPage2", url, string(body)) } //offset += int64(total) for _, tmp := range dataticks { ti, id, err := toEthMK(tmp.(map[string]interface{})) if err != nil { fmt.Println("toMac=", err) continue } ti.InsId = instrumentId offset = id cb(ti) } return offset, nil } */ func GetBtcTickbyPage(offset int64, cb func(*Market)) (int64, error) { url := "http://32.33.cn:9902/tender/default/kline?symbol=BTC&offset=" + fmt.Sprintf("%d", offset) + "&limit=100" response, err := http.Get(url) if err != nil { //log.Println("k1", url, err) return offset, err } defer response.Body.Close() body, err := ioutil.ReadAll(response.Body) if err != nil { //log.Println("k2", url, err) return offset, err } var data map[string]interface{} err = json.Unmarshal(body, &data) if err != nil { //log.Println("k3", url, err) return offset, err } //fmt.Println(url) dataticks, ok := data["result"].([]interface{}) if !ok { //log.Println("GetBtcTickbyPage1", url, string(body)) return offset, nil } total := len(dataticks) if total == 0 { //log.Println("GetBtcTickbyPage2", url, string(body)) } offset += int64(total) for _, tmp := range dataticks { ti, err := toBtcMK(tmp.(map[string]interface{})) if err != nil { fmt.Println("toBtc=", err) continue } cb(ti) } return offset, nil } func toEthMK(data map[string]interface{}) (*Market, int64, error) { mk := &Market{} mk.Type = IntBty mk.Timestamp = parseKTime(data["time"].(string)) price := data["price"].(float64) volume, _ := data["quantity"].(float64) id := int64(data["id"].(float64)) price /= (10000 * 10000) volume /= 100000000 mk.Close = price mk.Open = price mk.High = price mk.Low = price mk.AllAmount = volume mk.AllVolume = volume mk.LastPrice = price mk.LastVolume = volume var ask, bid PP ask[0] = price ask[1] = volume bid[0] = price bid[1] = volume mk.Asks = append(mk.Asks, ask) mk.Bids = append(mk.Bids, bid) return mk, id, nil } //func toEthMK(data map[string]interface{}) (*Market, int64, error) { //mk := &Market{} //mk.Type = IntBty /*symbolId := int32(data["symbolid"].(float64)) switch symbolId { case 0x50001: mk.InsId = markinfo.ETCCNY case 0x20001: mk.InsId = markinfo.BTCCNY case 0x40001: mk.InsId = markinfo.ETHCNY case 0x70001: mk.InsId = markinfo.SCCNY case 0x80001: mk.InsId = markinfo.ZECCNY case 0x90001: mk.InsId = markinfo.BTSCNY case 0xA0001: mk.InsId = markinfo.LTCCNY case 0xB0001: mk.InsId = markinfo.BCCCNY case 0xD0001: mk.InsId = markinfo.NYCCCNY case 0xE0001: mk.InsId = markinfo.WTCCNY case 0x40002: mk.InsId = markinfo.ETHBTC case 0x50002: mk.InsId = markinfo.ETCBTC case 0x80002: mk.InsId = markinfo.ZECBTC case 0xA0002: mk.InsId = markinfo.LTCBTC case 0xB0002: mk.InsId = markinfo.BCCBTC case 0x4000F: mk.InsId = markinfo.ETHUSDT case 0x5000F: mk.InsId = markinfo.ETCUSDT case 0x8000F: mk.InsId = markinfo.ZECUSDT case 0xA000F: mk.InsId = markinfo.LTCUSDT case 0xB000F: mk.InsId = markinfo.BCCUSDT case 0x2000F: mk.InsId = markinfo.BTCUSDT case 0x3000F: mk.InsId = markinfo.BTYUSDT case 0x9000F: mk.InsId = markinfo.BTSUSDT case 0x7000F: mk.InsId = markinfo.SCUSDT case 0x30002: mk.InsId = markinfo.BTYBTC case 0x90002: mk.InsId = markinfo.BTSBTC case 0x70002: mk.InsId = markinfo.SCBTC case 0xC000F: mk.InsId = markinfo.YCCUSDT case 0x11000F: mk.InsId = markinfo.DCRUSDT case 0x10000F: mk.InsId = markinfo.BNTUSDT case 0x100002: mk.InsId = markinfo.BNTBTC case 0x12000F: mk.InsId = markinfo.SCTCUSDT case 0x120002: mk.InsId = markinfo.SCTCBTC case 0xC0002: mk.InsId = markinfo.YCCBTC case 0xC0004: mk.InsId = markinfo.YCCETH case 0x15000F: mk.InsId = markinfo.JBUSDT case 0x150002: mk.InsId = markinfo.JBBTC case 0x150004: mk.InsId = markinfo.JBETH case 0x16000F: mk.InsId = markinfo.OPTCUSDT case 0x160002: mk.InsId = markinfo.OPTCBTC case 0x160004: mk.InsId = markinfo.OPTCETH case 0x20004: mk.InsId = markinfo.BTCETH case 0xB0004: mk.InsId = markinfo.BCCETH case 0x80004: mk.InsId = markinfo.ZECETH case 0x50004: mk.InsId = markinfo.ETCETH case 0xA0004: mk.InsId = markinfo.LTCETH case 0x130014: mk.InsId = markinfo.STILT case 0x17000F: mk.InsId = markinfo.ITVBUSDT case 0x170002: mk.InsId = markinfo.ITVBBTC case 0x170004: mk.InsId = markinfo.ITVBETH case 0x30004: mk.InsId = markinfo.BTYETH case 0x1A000F: mk.InsId = markinfo.FHUSDT case 0x1B000F: mk.InsId = markinfo.CWVUSDT case 0x1A0004: mk.InsId = markinfo.FHETH case 0x1C0004: mk.InsId = markinfo.TMCETH case 0x1D000F: mk.InsId = markinfo.FANSUSDT case 0x1D0002: mk.InsId = markinfo.FANSBTC case 0x1D0004: mk.InsId = markinfo.FANSETH case 0x1F0020: mk.InsId = markinfo.WTBWTC case 0x21000F: mk.InsId = markinfo.CNSUSDT default: return nil, 0, errors.New("invalid symbolid") } mk.Timestamp = parseKTime(data["time"].(string)) price := data["price"].(float64) volume, _ := data["quantity"].(float64) id := int64(data["id"].(float64)) price /= (10000 * 10000) volume /= 100000000 mk.Close = price mk.Open = price mk.High = price mk.Low = price mk.AllAmount = volume mk.AllVolume = volume mk.LastPrice = price mk.LastVolume = volume var ask, bid PP ask[0] = price ask[1] = volume bid[0] = price bid[1] = volume mk.Asks = append(mk.Asks, ask) mk.Bids = append(mk.Bids, bid)*/ //mk.Timestamp = int64(data["time"].(float64)) * 1000 //price := data["price"].(float64) //volume, _ := data["amount"].(float64) //id := int64(data["id"].(float64)) //price /= (10000 * 10000) //volume /= 100000000 //mk.Close = price //mk.Open = price //mk.High = price //mk.Low = price //mk.AllAmount = volume //mk.AllVolume = volume //mk.LastPrice = price //mk.LastVolume = volume //var ask, bid PP //ask[0] = price //ask[1] = volume //bid[0] = price //bid[1] = volume //mk.Asks = append(mk.Asks, ask) //mk.Bids = append(mk.Bids, bid) //return mk, id, nil //} func parseKTime(timeStr string) int64 { loc, _ := time.LoadLocation("Asia/Chongqing") var year, month, day, hour, minute, second int fmt.Sscanf(timeStr, "%04d-%02d-%02d %02d:%02d:%02d", &year, &month, &day, &hour, &minute, &second) t := time.Date(year, time.Month(month), day, hour, minute, second, 0, loc) return t.Unix() * 1000 } func toBtcMK(data map[string]interface{}) (*Market, error) { mk := &Market{} mk.Type = IntBty mk.InsId = markinfo.BTCCNY tick_time := data["time"] f_tick_time, _ := strconv.ParseFloat(tick_time.(string), 32) mk.Timestamp = int64(f_tick_time / (1e6)) price := data["price"].(float64) volume := data["quantity"].(float64) price /= (100 * 10000) //volume /= 100 mk.Close = price mk.Open = price mk.High = price mk.Low = price mk.AllAmount = volume mk.AllVolume = volume mk.LastPrice = price mk.LastVolume = volume var ask, bid PP ask[0] = price ask[1] = volume bid[0] = price bid[1] = volume mk.Asks = append(mk.Asks, ask) mk.Bids = append(mk.Bids, bid) return mk, nil } type TickRead struct { ch chan *Market err chan error } func NewTickRead() *TickRead { ch := make(chan *Market, 1024) errch := make(chan error) reader := &TickRead{} reader.ch = ch reader.err = errch return reader } func (tr *TickRead) Read() (*Market, error) { tick := <-tr.ch if tick == nil { return nil, <-tr.err } return tick, nil } func GetEthTick(instrumentId, offset int64) error { //reader := NewTickRead() lasttime := int64(0) go func() { for { offset_next, err := GetEthTickbyPage(instrumentId, offset, func(mk *Market) { if mk != nil { if mk.Timestamp >= lasttime { bty_reader.ch <- mk lasttime = mk.Timestamp } } }) time.Sleep(time.Second) if err != nil { //log.Println("GetEthTick", err, offset) continue } if offset < offset_next { symbol, _ := markinfo.SymbolName(int(instrumentId)) err := updateLastoffset(Bty, symbol, offset_next) if err != nil { continue } //time.Sleep(time.Duration((offset_next-offset)/100) * time.Second) offset = offset_next } //if !isEndPage { //continue //} } }() return nil } func GetBtcTick(instrumentId, offset int64) error { //reader := NewTickRead() lasttime := int64(0) go func() { for { //offset, _ = getLastOffset(Bty, "BTCCNY") offset_next, err := GetBtcTickbyPage(offset, func(mk *Market) { if mk != nil { if mk.Timestamp >= lasttime { bty_reader.ch <- mk lasttime = mk.Timestamp } } }) time.Sleep(time.Second) if err != nil { //log.Println("GetBtcTick", err, offset) continue } if offset < offset_next { symbol, _ := markinfo.SymbolName(int(instrumentId)) err := updateLastoffset(Bty, symbol, offset_next) if err != nil { continue } //time.Sleep(time.Duration((offset_next-offset)/100) * time.Second) offset = offset_next } //if !isEndPage { //continue //} } }() return nil } /*创建数据表*/ func checkBtyTable() error { sql := fmt.Sprintf("create table if not exists %s (ty varchar(10), insId varchar(20), lastOffset bigint)", btyTable) _, err := db.Exec(sql) if err != nil { return err } return nil } func getLastOffset(ty, insId string) (lastOffset int64, err error) { szSelectTable := "SELECT `lastOffset` FROM `" + btyTable + "` WHERE `ty` = '" + ty + "' AND `insId` = '" + insId + "';" row := db.QueryRow(szSelectTable) err = row.Scan(&lastOffset) if err == sql.ErrNoRows { q := fmt.Sprintf("INSERT INTO %s (ty, insId, lastOffset) values ('%s', '%s', '%d')", btyTable, ty, insId, lastOffset) _, err = db.Exec(q) if err != nil { fmt.Println("getLastOffset", err) return lastOffset, err } return lastOffset, nil } return lastOffset, err } func updateLastoffset(typ, insId string, lastOffset int64) error { //INSERT INTO q := fmt.Sprintf("UPDATE %s set lastOffset = '%d' where ty = '%s' and insId = '%s'", btyTable, lastOffset, typ, insId) _, err := db.Exec(q) if err != nil { fmt.Println("updateLastoffset", err) } return err } func GetMacTick(instrumentId, ntime int64) error { //reader := NewTickRead() go func() { for { err, _ntime, _ := GetMacTickbyPage(1, ntime, instrumentId, func(mk *Market) { if mk != nil { bty_reader.ch <- mk } }) time.Sleep(time.Second) //fmt.Println(err, ntime, isEndPage) if err != nil { //log.Println("GetMacTick", err) continue } if ntime < _ntime { symbol, _ := markinfo.SymbolName(int(instrumentId)) err := updateLastoffset(Bty, symbol, _ntime) if err != nil { continue } //time.Sleep(time.Duration(total/100) * time.Second) ntime = _ntime } //if !isEndPage { //continue //} } }() return nil } func GetMacTickbyPage(page int, time2 int64, instrumentId int64, cb func(*Market)) (error, int64, int64) { url := "https://zc.bityuan.com/site/tick2?count=500&Tick_page=" + fmt.Sprintf("%d", page) + "&Tick_sort=match_id.asc&time=" + fmt.Sprintf("%d", time2) //fmt.Println(url) response, err := http.Get(url) if err != nil { //fmt.Println("macoin", err) return err, 0, 0 } defer response.Body.Close() body, err := ioutil.ReadAll(response.Body) if err != nil { //fmt.Println(err) return err, 0, 0 } var data map[string]interface{} err = json.Unmarshal(body, &data) if err != nil { //fmt.Println("json Unmarshal", err) return err, 0, 0 } total, err := strconv.ParseInt(data["total"].(string), 10, 32) if err != nil { //fmt.Println("ParseInt total ", err) return err, 0, 0 } dataticks := data["data"].([]interface{}) ntime := int64(0) for _, tmp := range dataticks { mk, err := toMacMK(tmp.(map[string]interface{}), instrumentId) if mk == nil || err != nil { continue } ntime = (mk.Timestamp / 1000) + 1 if mk.LastPrice == 0.0 || mk.LastVolume == 0.0 { continue } cb(mk) } return nil, ntime, total } func toMacMK(data map[string]interface{}, instrumentId int64) (*Market, error) { mk := &Market{} mk.InsId = instrumentId mk.Type = IntBty timestamp, err := strconv.ParseInt(data["time"].(string), 10, 32) if err != nil { return nil, err } match_id, err := strconv.ParseInt(data["match_id"].(string), 10, 32) if err != nil { return nil, err } if last_offset > int64(match_id) { return nil, nil } last_offset = int64(match_id) mk.Timestamp = timestamp * 1000 // value var datavalue map[string]interface{} var datasell map[string]interface{} err = json.Unmarshal([]byte(data["value"].(string)), &datavalue) if err != nil { return nil, err } if len(datavalue["sell"].([]interface{})) == 0 { return mk, nil } datasell = (datavalue["sell"].([]interface{})[0]).(map[string]interface{}) price, err := strconv.ParseFloat(datasell["price"].(string), 64) if err != nil { return nil, err } price /= 1000 volume, err := strconv.ParseInt(data["amount"].(string), 10, 64) if err != nil { return nil, err } if volume < 0 { volume = -volume } mk.AllAmount = float64(volume) mk.AllVolume = float64(volume) mk.Close = price mk.Open = price mk.High = price mk.Low = price mk.LastPrice = price mk.LastVolume = float64(volume) var ask, bid PP ask[0] = price ask[1] = float64(volume) bid[0] = price bid[1] = float64(volume) mk.Asks = append(mk.Asks, ask) mk.Bids = append(mk.Bids, bid) return mk, nil }