123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626 |
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package tick
- // 本文件实现bityuan数据源接口, 实时数据和历史数据的获取和保存
- import (
- "database/sql"
- "encoding/json"
- "errors"
- "fmt"
- "io/ioutil"
- //"log"
- "net/http"
- "strconv"
- "time"
- "tickserver/markinfo"
- "tickserver/server/market"
- _ "github.com/go-sql-driver/mysql"
- )
- var btyInss = []int{
- //markinfo.SCCNY,
- //markinfo.BTCCNY,
- //markinfo.BTYCNY,
- //markinfo.ETHCNY,
- //markinfo.ETCCNY,
- //markinfo.ZECCNY,
- //markinfo.BTSCNY,
- //markinfo.LTCCNY,
- //markinfo.BCCCNY,
- //markinfo.NYCCCNY,
- //markinfo.WTCCNY,
- markinfo.ETHBTC,
- markinfo.ETCBTC,
- markinfo.ZECBTC,
- markinfo.LTCBTC,
- markinfo.BCCBTC,
- markinfo.ETHUSDT,
- markinfo.ETCUSDT,
- markinfo.ZECUSDT,
- markinfo.LTCUSDT,
- markinfo.BCCUSDT,
- markinfo.BTCUSDT,
- markinfo.BTYUSDT,
- markinfo.BTSUSDT,
- markinfo.SCUSDT,
- markinfo.BTYBTC,
- markinfo.BTSBTC,
- markinfo.SCBTC,
- markinfo.YCCUSDT,
- markinfo.BTCSUSDT,
- markinfo.DCRUSDT,
- }
- var btyTable = "inss_lastoffsets"
- var last_offset int64
- var bty_reader *TickRead
- // BtyDS实现了dataSource接口, 并对bityuan的历史数据和实时数据保存
- type BtyDS struct {
- *DSBase
- conf *DsConf
- lastOffsets []int64
- }
- func init() {
- drivers[Bty] = newBtyDS
- }
- func newBtyDS(conf *DsConf) (DataSource, error) {
- err := checkBtyTable()
- if err != nil {
- return nil, err
- }
- bds := &BtyDS{
- DSBase: NewDsBase(conf),
- conf: conf,
- }
- bds.insMap = btyInsMap()
- bds.lastOffsets = make([]int64, len(btyInss))
- for i := 0; i < len(btyInss); i++ {
- symbol, _ := markinfo.SymbolName(btyInss[i])
- lastOffset, err := getLastOffset(Bty, symbol)
- if err != nil {
- return nil, err
- }
- bds.lastOffsets[i] = lastOffset
- }
- bty_reader = NewTickRead()
- return bds, nil
- }
- func (bds *BtyDS) Name() string {
- return Bty
- }
- func (bds *BtyDS) Run() {
- //log.Println("BtyDS.Run")
- for i, offset := range bds.lastOffsets {
- bds.getData(int64(btyInss[i]), offset)
- }
- for {
- mk, err := bty_reader.Read()
- if err != nil {
- fmt.Println(err)
- }
- //log.Println("save fuck")
- bds.Save(mk)
- }
- }
- func btyInsMap() map[int64]*Instrument {
- insMap := make(map[int64]*Instrument)
- for _, id := range btyInss {
- x, _ := markinfo.SymbolName(id)
- u, _ := markinfo.SymbolUint(x)
- ins := &Instrument{
- Id: int64(id),
- Name: x,
- ExId: Bty,
- Type: market.Btcs,
- PriceInc: u,
- StartTime: time.Now().Unix() * 1000,
- }
- insMap[int64(id)] = ins
- }
- return insMap
- }
- func (bds *BtyDS) getData(instrumentId, offset int64) {
- //var reader *TickRead
- if instrumentId == markinfo.BTYCNY {
- GetMacTick(instrumentId, offset)
- } else {
- GetEthTick(instrumentId, offset)
- }
- //if instrumentId == markinfo.BTCCNY {
- //GetBtcTick(instrumentId, offset)
- //}
- }
- func GetEthTickbyPage(instrumentId, offset int64, cb func(*Market)) (int64, error) {
- symbol, _ := markinfo.SymbolName(int(instrumentId))
- //url := "http://47.75.62.253:46658/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
- //url := "http://47.74.9.155:45656/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
- url := "http://10.0.1.5:45656/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
- //url := "http://121.196.205.182:45656/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
- response, err := http.Get(url)
- if err != nil {
- //log.Println("f1", url, err)
- return offset, err
- }
- defer response.Body.Close()
- body, err := ioutil.ReadAll(response.Body)
- if err != nil {
- //log.Println("f2", url, err)
- return offset, err
- }
- var data map[string]interface{}
- err = json.Unmarshal(body, &data)
- if err != nil {
- //log.Println("f3", url, err)
- return offset, err
- }
- //fmt.Println(url)
- dataticks, ok := data["result"].([]interface{})
- if !ok {
- //log.Println("GetEthTickbyPage1", url, string(body))
- return offset, nil
- }
- total := len(dataticks)
- if total == 0 {
- //log.Println("GetEthTickbyPage2", url, string(body))
- }
- //offset += int64(total)
- for _, tmp := range dataticks {
- ti, id, err := toEthMK(tmp.(map[string]interface{}))
- if err != nil {
- fmt.Println("toMac=", err)
- continue
- }
- offset = id
- cb(ti)
- }
- return offset, nil
- }
- func GetBtcTickbyPage(offset int64, cb func(*Market)) (int64, error) {
- url := "http://32.33.cn:9902/tender/default/kline?symbol=BTC&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
- response, err := http.Get(url)
- if err != nil {
- //log.Println("k1", url, err)
- return offset, err
- }
- defer response.Body.Close()
- body, err := ioutil.ReadAll(response.Body)
- if err != nil {
- //log.Println("k2", url, err)
- return offset, err
- }
- var data map[string]interface{}
- err = json.Unmarshal(body, &data)
- if err != nil {
- //log.Println("k3", url, err)
- return offset, err
- }
- //fmt.Println(url)
- dataticks, ok := data["result"].([]interface{})
- if !ok {
- //log.Println("GetBtcTickbyPage1", url, string(body))
- return offset, nil
- }
- total := len(dataticks)
- if total == 0 {
- //log.Println("GetBtcTickbyPage2", url, string(body))
- }
- offset += int64(total)
- for _, tmp := range dataticks {
- ti, err := toBtcMK(tmp.(map[string]interface{}))
- if err != nil {
- fmt.Println("toBtc=", err)
- continue
- }
- cb(ti)
- }
- return offset, nil
- }
- func toEthMK(data map[string]interface{}) (*Market, int64, error) {
- mk := &Market{}
- mk.Type = IntBty
- symbolId := int32(data["symbolid"].(float64))
- switch symbolId {
- case 327681:
- mk.InsId = markinfo.ETCCNY
- case 131073:
- mk.InsId = markinfo.BTCCNY
- case 262145:
- mk.InsId = markinfo.ETHCNY
- case 458753:
- mk.InsId = markinfo.SCCNY
- case 524289:
- mk.InsId = markinfo.ZECCNY
- case 589825:
- mk.InsId = markinfo.BTSCNY
- case 655361:
- mk.InsId = markinfo.LTCCNY
- case 720897:
- mk.InsId = markinfo.BCCCNY
- case 851969:
- mk.InsId = markinfo.NYCCCNY
- case 917505:
- mk.InsId = markinfo.WTCCNY
- case 262146:
- mk.InsId = markinfo.ETHBTC
- case 327682:
- mk.InsId = markinfo.ETCBTC
- case 524290:
- mk.InsId = markinfo.ZECBTC
- case 655362:
- mk.InsId = markinfo.LTCBTC
- case 720898:
- mk.InsId = markinfo.BCCBTC
- case 262159:
- mk.InsId = markinfo.ETHUSDT
- case 327695:
- mk.InsId = markinfo.ETCUSDT
- case 524303:
- mk.InsId = markinfo.ZECUSDT
- case 655375:
- mk.InsId = markinfo.LTCUSDT
- case 720911:
- mk.InsId = markinfo.BCCUSDT
- case 131087:
- mk.InsId = markinfo.BTCUSDT
- case 196623:
- mk.InsId = markinfo.BTYUSDT
- case 589839:
- mk.InsId = markinfo.BTSUSDT
- case 458767:
- mk.InsId = markinfo.SCUSDT
- case 196610:
- mk.InsId = markinfo.BTYBTC
- case 589826:
- mk.InsId = markinfo.BTSBTC
- case 458754:
- mk.InsId = markinfo.SCBTC
- case 786447:
- mk.InsId = markinfo.YCCUSDT
- case 1048591:
- mk.InsId = markinfo.BTCSUSDT
- case 1114127:
- mk.InsId = markinfo.DCRUSDT
- default:
- return nil, 0, errors.New("invalid symbolid")
- }
- mk.Timestamp = parseKTime(data["time"].(string))
- price := data["price"].(float64)
- volume, _ := data["quantity"].(float64)
- id := int64(data["id"].(float64))
- price /= (10000 * 10000)
- volume /= 100000000
- mk.Close = price
- mk.Open = price
- mk.High = price
- mk.Low = price
- mk.AllAmount = volume
- mk.AllVolume = volume
- mk.LastPrice = price
- mk.LastVolume = volume
- var ask, bid PP
- ask[0] = price
- ask[1] = volume
- bid[0] = price
- bid[1] = volume
- mk.Asks = append(mk.Asks, ask)
- mk.Bids = append(mk.Bids, bid)
- return mk, id, nil
- }
- func parseKTime(timeStr string) int64 {
- loc, _ := time.LoadLocation("Asia/Chongqing")
- var year, month, day, hour, minute, second int
- fmt.Sscanf(timeStr, "%04d-%02d-%02d %02d:%02d:%02d", &year, &month, &day, &hour, &minute, &second)
- t := time.Date(year, time.Month(month), day, hour, minute, second, 0, loc)
- return t.Unix() * 1000
- }
- func toBtcMK(data map[string]interface{}) (*Market, error) {
- mk := &Market{}
- mk.Type = IntBty
- mk.InsId = markinfo.BTCCNY
- tick_time := data["time"]
- f_tick_time, _ := strconv.ParseFloat(tick_time.(string), 32)
- mk.Timestamp = int64(f_tick_time / (1e6))
- price := data["price"].(float64)
- volume := data["quantity"].(float64)
- price /= (100 * 10000)
- //volume /= 100
- mk.Close = price
- mk.Open = price
- mk.High = price
- mk.Low = price
- mk.AllAmount = volume
- mk.AllVolume = volume
- mk.LastPrice = price
- mk.LastVolume = volume
- var ask, bid PP
- ask[0] = price
- ask[1] = volume
- bid[0] = price
- bid[1] = volume
- mk.Asks = append(mk.Asks, ask)
- mk.Bids = append(mk.Bids, bid)
- return mk, nil
- }
- type TickRead struct {
- ch chan *Market
- err chan error
- }
- func NewTickRead() *TickRead {
- ch := make(chan *Market, 1024)
- errch := make(chan error)
- reader := &TickRead{}
- reader.ch = ch
- reader.err = errch
- return reader
- }
- func (tr *TickRead) Read() (*Market, error) {
- tick := <-tr.ch
- if tick == nil {
- return nil, <-tr.err
- }
- return tick, nil
- }
- func GetEthTick(instrumentId, offset int64) error {
- //reader := NewTickRead()
- lasttime := int64(0)
- go func() {
- for {
- offset_next, err := GetEthTickbyPage(instrumentId, offset, func(mk *Market) {
- if mk != nil {
- if mk.Timestamp >= lasttime {
- bty_reader.ch <- mk
- lasttime = mk.Timestamp
- }
- }
- })
- time.Sleep(time.Second)
- if err != nil {
- //log.Println("GetEthTick", err, offset)
- continue
- }
- if offset < offset_next {
- symbol, _ := markinfo.SymbolName(int(instrumentId))
- err := updateLastoffset(Bty, symbol, offset_next)
- if err != nil {
- continue
- }
- //time.Sleep(time.Duration((offset_next-offset)/100) * time.Second)
- offset = offset_next
- }
- //if !isEndPage {
- //continue
- //}
- }
- }()
- return nil
- }
- func GetBtcTick(instrumentId, offset int64) error {
- //reader := NewTickRead()
- lasttime := int64(0)
- go func() {
- for {
- //offset, _ = getLastOffset(Bty, "BTCCNY")
- offset_next, err := GetBtcTickbyPage(offset, func(mk *Market) {
- if mk != nil {
- if mk.Timestamp >= lasttime {
- bty_reader.ch <- mk
- lasttime = mk.Timestamp
- }
- }
- })
- time.Sleep(time.Second)
- if err != nil {
- //log.Println("GetBtcTick", err, offset)
- continue
- }
- if offset < offset_next {
- symbol, _ := markinfo.SymbolName(int(instrumentId))
- err := updateLastoffset(Bty, symbol, offset_next)
- if err != nil {
- continue
- }
- //time.Sleep(time.Duration((offset_next-offset)/100) * time.Second)
- offset = offset_next
- }
- //if !isEndPage {
- //continue
- //}
- }
- }()
- return nil
- }
- /*创建数据表*/
- func checkBtyTable() error {
- sql := fmt.Sprintf("create table if not exists %s (ty varchar(10), insId varchar(20), lastOffset bigint)", btyTable)
- _, err := db.Exec(sql)
- if err != nil {
- return err
- }
- return nil
- }
- func getLastOffset(ty, insId string) (lastOffset int64, err error) {
- szSelectTable := "SELECT `lastOffset` FROM `" + btyTable + "` WHERE `ty` = '" + ty + "' AND `insId` = '" + insId + "';"
- row := db.QueryRow(szSelectTable)
- err = row.Scan(&lastOffset)
- if err == sql.ErrNoRows {
- q := fmt.Sprintf("INSERT INTO %s (ty, insId, lastOffset) values ('%s', '%s', '%d')", btyTable, ty, insId, lastOffset)
- _, err = db.Exec(q)
- if err != nil {
- fmt.Println("getLastOffset", err)
- return lastOffset, err
- }
- return lastOffset, nil
- }
- return lastOffset, err
- }
- func updateLastoffset(typ, insId string, lastOffset int64) error {
- //INSERT INTO
- q := fmt.Sprintf("UPDATE %s set lastOffset = '%d' where ty = '%s' and insId = '%s'", btyTable, lastOffset, typ, insId)
- _, err := db.Exec(q)
- if err != nil {
- fmt.Println("updateLastoffset", err)
- }
- return err
- }
- func GetMacTick(instrumentId, ntime int64) error {
- //reader := NewTickRead()
- go func() {
- for {
- err, _ntime, _ := GetMacTickbyPage(1, ntime, instrumentId, func(mk *Market) {
- if mk != nil {
- bty_reader.ch <- mk
- }
- })
- time.Sleep(time.Second)
- //fmt.Println(err, ntime, isEndPage)
- if err != nil {
- //log.Println("GetMacTick", err)
- continue
- }
- if ntime < _ntime {
- symbol, _ := markinfo.SymbolName(int(instrumentId))
- err := updateLastoffset(Bty, symbol, _ntime)
- if err != nil {
- continue
- }
- //time.Sleep(time.Duration(total/100) * time.Second)
- ntime = _ntime
- }
- //if !isEndPage {
- //continue
- //}
- }
- }()
- return nil
- }
- func GetMacTickbyPage(page int, time2 int64, instrumentId int64, cb func(*Market)) (error, int64, int64) {
- url := "https://zc.bityuan.com/site/tick2?count=500&Tick_page=" + fmt.Sprintf("%d", page) + "&Tick_sort=match_id.asc&time=" + fmt.Sprintf("%d", time2)
- //fmt.Println(url)
- response, err := http.Get(url)
- if err != nil {
- //fmt.Println("macoin", err)
- return err, 0, 0
- }
- defer response.Body.Close()
- body, err := ioutil.ReadAll(response.Body)
- if err != nil {
- //fmt.Println(err)
- return err, 0, 0
- }
- var data map[string]interface{}
- err = json.Unmarshal(body, &data)
- if err != nil {
- //fmt.Println("json Unmarshal", err)
- return err, 0, 0
- }
- total, err := strconv.ParseInt(data["total"].(string), 10, 32)
- if err != nil {
- //fmt.Println("ParseInt total ", err)
- return err, 0, 0
- }
- dataticks := data["data"].([]interface{})
- ntime := int64(0)
- for _, tmp := range dataticks {
- mk, err := toMacMK(tmp.(map[string]interface{}), instrumentId)
- if mk == nil || err != nil {
- continue
- }
- ntime = (mk.Timestamp / 1000) + 1
- if mk.LastPrice == 0.0 || mk.LastVolume == 0.0 {
- continue
- }
- cb(mk)
- }
- return nil, ntime, total
- }
- func toMacMK(data map[string]interface{}, instrumentId int64) (*Market, error) {
- mk := &Market{}
- mk.InsId = instrumentId
- mk.Type = IntBty
- timestamp, err := strconv.ParseInt(data["time"].(string), 10, 32)
- if err != nil {
- return nil, err
- }
- match_id, err := strconv.ParseInt(data["match_id"].(string), 10, 32)
- if err != nil {
- return nil, err
- }
- if last_offset > int64(match_id) {
- return nil, nil
- }
- last_offset = int64(match_id)
- mk.Timestamp = timestamp * 1000
- // value
- var datavalue map[string]interface{}
- var datasell map[string]interface{}
- err = json.Unmarshal([]byte(data["value"].(string)), &datavalue)
- if err != nil {
- return nil, err
- }
- if len(datavalue["sell"].([]interface{})) == 0 {
- return mk, nil
- }
- datasell = (datavalue["sell"].([]interface{})[0]).(map[string]interface{})
- price, err := strconv.ParseFloat(datasell["price"].(string), 64)
- if err != nil {
- return nil, err
- }
- price /= 1000
- volume, err := strconv.ParseInt(data["amount"].(string), 10, 64)
- if err != nil {
- return nil, err
- }
- if volume < 0 {
- volume = -volume
- }
- mk.AllAmount = float64(volume)
- mk.AllVolume = float64(volume)
- mk.Close = price
- mk.Open = price
- mk.High = price
- mk.Low = price
- mk.LastPrice = price
- mk.LastVolume = float64(volume)
- var ask, bid PP
- ask[0] = price
- ask[1] = float64(volume)
- bid[0] = price
- bid[1] = float64(volume)
- mk.Asks = append(mk.Asks, ask)
- mk.Bids = append(mk.Bids, bid)
- return mk, nil
- }
|