123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875 |
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package tick
- // 本文件实现bityuan数据源接口, 实时数据和历史数据的获取和保存
- import (
- "database/sql"
- "encoding/json"
- //"errors"
- "fmt"
- "io/ioutil"
- //"log"
- //"bytes"
- "net/http"
- "strconv"
- "time"
- "tickserver/markinfo"
- "tickserver/server/market"
- _ "github.com/go-sql-driver/mysql"
- )
- var btyInss = []int{
- markinfo.ETHBTC,
- markinfo.ETCBTC,
- markinfo.ZECBTC,
- markinfo.LTCBTC,
- markinfo.BCCBTC,
- markinfo.ETHUSDT,
- markinfo.ETCUSDT,
- markinfo.ZECUSDT,
- markinfo.LTCUSDT,
- markinfo.BCCUSDT,
- markinfo.BTCUSDT,
- markinfo.BTYUSDT,
- markinfo.BTSUSDT,
- markinfo.SCUSDT,
- markinfo.DCRUSDT,
- markinfo.BNTUSDT,
- markinfo.BNTBTC,
- markinfo.SCTCUSDT,
- markinfo.SCTCBTC,
- markinfo.YCCUSDT,
- markinfo.YCCBTC,
- markinfo.YCCETH,
- markinfo.JBUSDT,
- markinfo.JBBTC,
- markinfo.JBETH,
- markinfo.OPTCUSDT,
- markinfo.OPTCBTC,
- markinfo.OPTCETH,
- markinfo.BTCETH,
- markinfo.BCCETH,
- markinfo.ZECETH,
- markinfo.ETCETH,
- markinfo.LTCETH,
- markinfo.STILT,
- markinfo.ITVBUSDT,
- markinfo.ITVBBTC,
- markinfo.ITVBETH,
- markinfo.BTYETH,
- markinfo.FHUSDT,
- markinfo.CWVUSDT,
- markinfo.FHETH,
- markinfo.TMCETH,
- markinfo.FANSUSDT,
- markinfo.FANSBTC,
- markinfo.FANSETH,
- markinfo.WTBWTC,
- markinfo.CNSUSDT,
- }
- var inss = map[int]string{
- markinfo.ETHBTC: "ETH_BTC",
- markinfo.ETCBTC: "ETC_BTC",
- markinfo.ZECBTC: "ZEC_BTC",
- markinfo.LTCBTC: "LTC_BTC",
- markinfo.BCCBTC: "BCC_BTC",
- markinfo.ETHUSDT: "ETH_USDT",
- markinfo.ETCUSDT: "ETC_USDT",
- markinfo.ZECUSDT: "ZEC_USDT",
- markinfo.LTCUSDT: "LTC_USDT",
- markinfo.BCCUSDT: "BCC_USDT",
- markinfo.BTCUSDT: "BTC_USDT",
- markinfo.BTYUSDT: "BTY_USDT",
- markinfo.BTSUSDT: "BTS_USDT",
- markinfo.SCUSDT: "SC_USDT",
- markinfo.DCRUSDT: "DCR_USDT",
- markinfo.BNTUSDT: "BNT_USDT",
- markinfo.BNTBTC: "BNT_BTC",
- markinfo.SCTCUSDT: "SCTC_USDT",
- markinfo.SCTCBTC: "SCTC_BTC",
- markinfo.YCCUSDT: "YCC_USDT",
- markinfo.YCCBTC: "YCC_BTC",
- markinfo.YCCETH: "YCC_ETH",
- markinfo.JBUSDT: "JB_USDT",
- markinfo.JBBTC: "JB_BTC",
- markinfo.JBETH: "JB_ETH",
- markinfo.OPTCUSDT: "OPTC_USDT",
- markinfo.OPTCBTC: "OPTC_BTC",
- markinfo.OPTCETH: "OPTC_ETH",
- markinfo.BTCETH: "BTC_ETH",
- markinfo.BCCETH: "BCC_ETH",
- markinfo.ZECETH: "ZEC_ETH",
- markinfo.ETCETH: "ETC_ETH",
- markinfo.LTCETH: "LTC_ETH",
- markinfo.STILT: "ST_ILT",
- markinfo.ITVBUSDT: "ITVB_USDT",
- markinfo.ITVBBTC: "ITVB_BTC",
- markinfo.ITVBETH: "ITVB_ETH",
- markinfo.BTYETH: "BTY_ETH",
- markinfo.FHUSDT: "FH_USDT",
- markinfo.CWVUSDT: "CWV_USDT",
- markinfo.FHETH: "FH_ETH",
- markinfo.TMCETH: "TMC_ETH",
- markinfo.FANSUSDT: "FANS_USDT",
- markinfo.FANSBTC: "FANS_BTC",
- markinfo.FANSETH: "FANS_ETH",
- markinfo.WTBWTC: "WTB_WTC",
- markinfo.CNSUSDT: "CNS_USDT",
- }
- var btyTable = "inss_lastoffsets"
- var last_offset int64
- var bty_reader *TickRead
- // BtyDS实现了dataSource接口, 并对bityuan的历史数据和实时数据保存
- type BtyDS struct {
- *DSBase
- conf *DsConf
- lastOffsets []int64
- }
- func init() {
- drivers[Bty] = newBtyDS
- }
- func newBtyDS(conf *DsConf) (DataSource, error) {
- err := checkBtyTable()
- if err != nil {
- return nil, err
- }
- bds := &BtyDS{
- DSBase: NewDsBase(conf),
- conf: conf,
- }
- bds.insMap = btyInsMap()
- bds.lastOffsets = make([]int64, len(btyInss))
- for i := 0; i < len(btyInss); i++ {
- symbol, _ := markinfo.SymbolName(btyInss[i])
- lastOffset, err := getLastOffset(Bty, symbol)
- if err != nil {
- return nil, err
- }
- bds.lastOffsets[i] = lastOffset
- }
- bty_reader = NewTickRead()
- return bds, nil
- }
- func (bds *BtyDS) Name() string {
- return Bty
- }
- func (bds *BtyDS) Run() {
- //log.Println("BtyDS.Run")
- for i, offset := range bds.lastOffsets {
- bds.getData(int64(btyInss[i]), offset)
- }
- for {
- mk, err := bty_reader.Read()
- if err != nil {
- fmt.Println("1", err)
- }
- //log.Println("save fuck")
- bds.Save(mk)
- }
- }
- func btyInsMap() map[int64]*Instrument {
- insMap := make(map[int64]*Instrument)
- for _, id := range btyInss {
- x, _ := markinfo.SymbolName(id)
- u, _ := markinfo.SymbolUint(x)
- ins := &Instrument{
- Id: int64(id),
- Name: x,
- ExId: Bty,
- Type: market.Btcs,
- PriceInc: u,
- StartTime: time.Now().Unix() * 1000,
- }
- insMap[int64(id)] = ins
- }
- return insMap
- }
- func (bds *BtyDS) getData(instrumentId, offset int64) {
- //var reader *TickRead
- if instrumentId == markinfo.BTYCNY {
- GetMacTick(instrumentId, offset)
- } else {
- GetEthTick(instrumentId, offset)
- }
- //if instrumentId == markinfo.BTCCNY {
- //GetBtcTick(instrumentId, offset)
- //}
- }
- func GetEthTickbyPage(instrumentId, offset int64, cb func(*Market)) (int64, error) {
- symbol, _ := markinfo.SymbolName(int(instrumentId))
- //url := "http://122.224.124.250:10115/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
- //url := "http://10.0.1.5:45659/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
- url := "http://10.0.1.177:80/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
- response, err := http.Get(url)
- if err != nil {
- return offset, err
- }
- defer response.Body.Close()
- body, err := ioutil.ReadAll(response.Body)
- if err != nil {
- //fmt.Println("f2", url, err)
- return offset, err
- }
- var data map[string]interface{}
- err = json.Unmarshal(body, &data)
- if err != nil {
- //fmt.Println("f3", url, err)
- return offset, err
- }
- //fmt.Println(url)
- dataticks, ok := data["result"].([]interface{})
- if !ok {
- //fmt.Println("GetEthTickbyPage1", req, string(body))
- return offset, nil
- }
- total := len(dataticks)
- if total == 0 {
- //log.Println("GetEthTickbyPage2", url, string(body))
- }
- //offset += int64(total)
- for _, tmp := range dataticks {
- ti, id, err := toEthMK(tmp.(map[string]interface{}))
- if err != nil {
- fmt.Println("toMac=", err)
- continue
- }
- ti.InsId = instrumentId
- offset = id
- cb(ti)
- }
- return offset, nil
- }
- /*
- func GetEthTickbyPage(instrumentId, offset int64, cb func(*Market)) (int64, error) {
- symbol := inss[int(instrumentId)]
- //url := "http://122.224.124.250:10115/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
- //url := "http://10.0.1.5:45659/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
- //response, err := http.Get(url)
- //if err != nil {
- //return offset, err
- //}
- params := make(map[string]interface{})
- params["symbol"] = symbol
- params["offset"] = offset
- params["limit"] = 100
- req := make(map[string]interface{})
- req["jsonrpc"] = "2.0"
- req["method"] = "kline"
- req["id"] = 0
- req["params"] = params
- bytesData, err := json.Marshal(req)
- if err != nil {
- //fmt.Println("2", err)
- return offset, err
- }
- reader := bytes.NewReader(bytesData)
- url := "http://47.91.198.174:45656"
- request, err := http.NewRequest("POST", url, reader)
- if err != nil {
- //fmt.Println("3", err)
- return offset, err
- }
- request.Header.Set("Content-Type", "application/json;charset=UTF-8")
- client := http.Client{}
- response, err := client.Do(request)
- if err != nil {
- //fmt.Println("4", err)
- return offset, err
- }
- defer response.Body.Close()
- body, err := ioutil.ReadAll(response.Body)
- if err != nil {
- //fmt.Println("f2", url, err)
- return offset, err
- }
- var data map[string]interface{}
- err = json.Unmarshal(body, &data)
- if err != nil {
- //fmt.Println("f3", url, err)
- return offset, err
- }
- //fmt.Println(url)
- dataticks, ok := data["result"].([]interface{})
- if !ok {
- //fmt.Println("GetEthTickbyPage1", req, string(body))
- return offset, nil
- }
- total := len(dataticks)
- if total == 0 {
- //log.Println("GetEthTickbyPage2", url, string(body))
- }
- //offset += int64(total)
- for _, tmp := range dataticks {
- ti, id, err := toEthMK(tmp.(map[string]interface{}))
- if err != nil {
- fmt.Println("toMac=", err)
- continue
- }
- ti.InsId = instrumentId
- offset = id
- cb(ti)
- }
- return offset, nil
- }
- */
- func GetBtcTickbyPage(offset int64, cb func(*Market)) (int64, error) {
- url := "http://32.33.cn:9902/tender/default/kline?symbol=BTC&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
- response, err := http.Get(url)
- if err != nil {
- //log.Println("k1", url, err)
- return offset, err
- }
- defer response.Body.Close()
- body, err := ioutil.ReadAll(response.Body)
- if err != nil {
- //log.Println("k2", url, err)
- return offset, err
- }
- var data map[string]interface{}
- err = json.Unmarshal(body, &data)
- if err != nil {
- //log.Println("k3", url, err)
- return offset, err
- }
- //fmt.Println(url)
- dataticks, ok := data["result"].([]interface{})
- if !ok {
- //log.Println("GetBtcTickbyPage1", url, string(body))
- return offset, nil
- }
- total := len(dataticks)
- if total == 0 {
- //log.Println("GetBtcTickbyPage2", url, string(body))
- }
- offset += int64(total)
- for _, tmp := range dataticks {
- ti, err := toBtcMK(tmp.(map[string]interface{}))
- if err != nil {
- fmt.Println("toBtc=", err)
- continue
- }
- cb(ti)
- }
- return offset, nil
- }
- func toEthMK(data map[string]interface{}) (*Market, int64, error) {
- mk := &Market{}
- mk.Type = IntBty
- mk.Timestamp = parseKTime(data["time"].(string))
- price := data["price"].(float64)
- volume, _ := data["quantity"].(float64)
- id := int64(data["id"].(float64))
- price /= (10000 * 10000)
- volume /= 100000000
- mk.Close = price
- mk.Open = price
- mk.High = price
- mk.Low = price
- mk.AllAmount = volume
- mk.AllVolume = volume
- mk.LastPrice = price
- mk.LastVolume = volume
- var ask, bid PP
- ask[0] = price
- ask[1] = volume
- bid[0] = price
- bid[1] = volume
- mk.Asks = append(mk.Asks, ask)
- mk.Bids = append(mk.Bids, bid)
- return mk, id, nil
- }
- //func toEthMK(data map[string]interface{}) (*Market, int64, error) {
- //mk := &Market{}
- //mk.Type = IntBty
- /*symbolId := int32(data["symbolid"].(float64))
- switch symbolId {
- case 0x50001:
- mk.InsId = markinfo.ETCCNY
- case 0x20001:
- mk.InsId = markinfo.BTCCNY
- case 0x40001:
- mk.InsId = markinfo.ETHCNY
- case 0x70001:
- mk.InsId = markinfo.SCCNY
- case 0x80001:
- mk.InsId = markinfo.ZECCNY
- case 0x90001:
- mk.InsId = markinfo.BTSCNY
- case 0xA0001:
- mk.InsId = markinfo.LTCCNY
- case 0xB0001:
- mk.InsId = markinfo.BCCCNY
- case 0xD0001:
- mk.InsId = markinfo.NYCCCNY
- case 0xE0001:
- mk.InsId = markinfo.WTCCNY
- case 0x40002:
- mk.InsId = markinfo.ETHBTC
- case 0x50002:
- mk.InsId = markinfo.ETCBTC
- case 0x80002:
- mk.InsId = markinfo.ZECBTC
- case 0xA0002:
- mk.InsId = markinfo.LTCBTC
- case 0xB0002:
- mk.InsId = markinfo.BCCBTC
- case 0x4000F:
- mk.InsId = markinfo.ETHUSDT
- case 0x5000F:
- mk.InsId = markinfo.ETCUSDT
- case 0x8000F:
- mk.InsId = markinfo.ZECUSDT
- case 0xA000F:
- mk.InsId = markinfo.LTCUSDT
- case 0xB000F:
- mk.InsId = markinfo.BCCUSDT
- case 0x2000F:
- mk.InsId = markinfo.BTCUSDT
- case 0x3000F:
- mk.InsId = markinfo.BTYUSDT
- case 0x9000F:
- mk.InsId = markinfo.BTSUSDT
- case 0x7000F:
- mk.InsId = markinfo.SCUSDT
- case 0x30002:
- mk.InsId = markinfo.BTYBTC
- case 0x90002:
- mk.InsId = markinfo.BTSBTC
- case 0x70002:
- mk.InsId = markinfo.SCBTC
- case 0xC000F:
- mk.InsId = markinfo.YCCUSDT
- case 0x11000F:
- mk.InsId = markinfo.DCRUSDT
- case 0x10000F:
- mk.InsId = markinfo.BNTUSDT
- case 0x100002:
- mk.InsId = markinfo.BNTBTC
- case 0x12000F:
- mk.InsId = markinfo.SCTCUSDT
- case 0x120002:
- mk.InsId = markinfo.SCTCBTC
- case 0xC0002:
- mk.InsId = markinfo.YCCBTC
- case 0xC0004:
- mk.InsId = markinfo.YCCETH
- case 0x15000F:
- mk.InsId = markinfo.JBUSDT
- case 0x150002:
- mk.InsId = markinfo.JBBTC
- case 0x150004:
- mk.InsId = markinfo.JBETH
- case 0x16000F:
- mk.InsId = markinfo.OPTCUSDT
- case 0x160002:
- mk.InsId = markinfo.OPTCBTC
- case 0x160004:
- mk.InsId = markinfo.OPTCETH
- case 0x20004:
- mk.InsId = markinfo.BTCETH
- case 0xB0004:
- mk.InsId = markinfo.BCCETH
- case 0x80004:
- mk.InsId = markinfo.ZECETH
- case 0x50004:
- mk.InsId = markinfo.ETCETH
- case 0xA0004:
- mk.InsId = markinfo.LTCETH
- case 0x130014:
- mk.InsId = markinfo.STILT
- case 0x17000F:
- mk.InsId = markinfo.ITVBUSDT
- case 0x170002:
- mk.InsId = markinfo.ITVBBTC
- case 0x170004:
- mk.InsId = markinfo.ITVBETH
- case 0x30004:
- mk.InsId = markinfo.BTYETH
- case 0x1A000F:
- mk.InsId = markinfo.FHUSDT
- case 0x1B000F:
- mk.InsId = markinfo.CWVUSDT
- case 0x1A0004:
- mk.InsId = markinfo.FHETH
- case 0x1C0004:
- mk.InsId = markinfo.TMCETH
- case 0x1D000F:
- mk.InsId = markinfo.FANSUSDT
- case 0x1D0002:
- mk.InsId = markinfo.FANSBTC
- case 0x1D0004:
- mk.InsId = markinfo.FANSETH
- case 0x1F0020:
- mk.InsId = markinfo.WTBWTC
- case 0x21000F:
- mk.InsId = markinfo.CNSUSDT
- default:
- return nil, 0, errors.New("invalid symbolid")
- }
- mk.Timestamp = parseKTime(data["time"].(string))
- price := data["price"].(float64)
- volume, _ := data["quantity"].(float64)
- id := int64(data["id"].(float64))
- price /= (10000 * 10000)
- volume /= 100000000
- mk.Close = price
- mk.Open = price
- mk.High = price
- mk.Low = price
- mk.AllAmount = volume
- mk.AllVolume = volume
- mk.LastPrice = price
- mk.LastVolume = volume
- var ask, bid PP
- ask[0] = price
- ask[1] = volume
- bid[0] = price
- bid[1] = volume
- mk.Asks = append(mk.Asks, ask)
- mk.Bids = append(mk.Bids, bid)*/
- //mk.Timestamp = int64(data["time"].(float64)) * 1000
- //price := data["price"].(float64)
- //volume, _ := data["amount"].(float64)
- //id := int64(data["id"].(float64))
- //price /= (10000 * 10000)
- //volume /= 100000000
- //mk.Close = price
- //mk.Open = price
- //mk.High = price
- //mk.Low = price
- //mk.AllAmount = volume
- //mk.AllVolume = volume
- //mk.LastPrice = price
- //mk.LastVolume = volume
- //var ask, bid PP
- //ask[0] = price
- //ask[1] = volume
- //bid[0] = price
- //bid[1] = volume
- //mk.Asks = append(mk.Asks, ask)
- //mk.Bids = append(mk.Bids, bid)
- //return mk, id, nil
- //}
- func parseKTime(timeStr string) int64 {
- loc, _ := time.LoadLocation("Asia/Chongqing")
- var year, month, day, hour, minute, second int
- fmt.Sscanf(timeStr, "%04d-%02d-%02d %02d:%02d:%02d", &year, &month, &day, &hour, &minute, &second)
- t := time.Date(year, time.Month(month), day, hour, minute, second, 0, loc)
- return t.Unix() * 1000
- }
- func toBtcMK(data map[string]interface{}) (*Market, error) {
- mk := &Market{}
- mk.Type = IntBty
- mk.InsId = markinfo.BTCCNY
- tick_time := data["time"]
- f_tick_time, _ := strconv.ParseFloat(tick_time.(string), 32)
- mk.Timestamp = int64(f_tick_time / (1e6))
- price := data["price"].(float64)
- volume := data["quantity"].(float64)
- price /= (100 * 10000)
- //volume /= 100
- mk.Close = price
- mk.Open = price
- mk.High = price
- mk.Low = price
- mk.AllAmount = volume
- mk.AllVolume = volume
- mk.LastPrice = price
- mk.LastVolume = volume
- var ask, bid PP
- ask[0] = price
- ask[1] = volume
- bid[0] = price
- bid[1] = volume
- mk.Asks = append(mk.Asks, ask)
- mk.Bids = append(mk.Bids, bid)
- return mk, nil
- }
- type TickRead struct {
- ch chan *Market
- err chan error
- }
- func NewTickRead() *TickRead {
- ch := make(chan *Market, 1024)
- errch := make(chan error)
- reader := &TickRead{}
- reader.ch = ch
- reader.err = errch
- return reader
- }
- func (tr *TickRead) Read() (*Market, error) {
- tick := <-tr.ch
- if tick == nil {
- return nil, <-tr.err
- }
- return tick, nil
- }
- func GetEthTick(instrumentId, offset int64) error {
- //reader := NewTickRead()
- lasttime := int64(0)
- go func() {
- for {
- offset_next, err := GetEthTickbyPage(instrumentId, offset, func(mk *Market) {
- if mk != nil {
- if mk.Timestamp >= lasttime {
- bty_reader.ch <- mk
- lasttime = mk.Timestamp
- }
- }
- })
- time.Sleep(time.Second)
- if err != nil {
- //log.Println("GetEthTick", err, offset)
- continue
- }
- if offset < offset_next {
- symbol, _ := markinfo.SymbolName(int(instrumentId))
- err := updateLastoffset(Bty, symbol, offset_next)
- if err != nil {
- continue
- }
- //time.Sleep(time.Duration((offset_next-offset)/100) * time.Second)
- offset = offset_next
- }
- //if !isEndPage {
- //continue
- //}
- }
- }()
- return nil
- }
- func GetBtcTick(instrumentId, offset int64) error {
- //reader := NewTickRead()
- lasttime := int64(0)
- go func() {
- for {
- //offset, _ = getLastOffset(Bty, "BTCCNY")
- offset_next, err := GetBtcTickbyPage(offset, func(mk *Market) {
- if mk != nil {
- if mk.Timestamp >= lasttime {
- bty_reader.ch <- mk
- lasttime = mk.Timestamp
- }
- }
- })
- time.Sleep(time.Second)
- if err != nil {
- //log.Println("GetBtcTick", err, offset)
- continue
- }
- if offset < offset_next {
- symbol, _ := markinfo.SymbolName(int(instrumentId))
- err := updateLastoffset(Bty, symbol, offset_next)
- if err != nil {
- continue
- }
- //time.Sleep(time.Duration((offset_next-offset)/100) * time.Second)
- offset = offset_next
- }
- //if !isEndPage {
- //continue
- //}
- }
- }()
- return nil
- }
- /*创建数据表*/
- func checkBtyTable() error {
- sql := fmt.Sprintf("create table if not exists %s (ty varchar(10), insId varchar(20), lastOffset bigint)", btyTable)
- _, err := db.Exec(sql)
- if err != nil {
- return err
- }
- return nil
- }
- func getLastOffset(ty, insId string) (lastOffset int64, err error) {
- szSelectTable := "SELECT `lastOffset` FROM `" + btyTable + "` WHERE `ty` = '" + ty + "' AND `insId` = '" + insId + "';"
- row := db.QueryRow(szSelectTable)
- err = row.Scan(&lastOffset)
- if err == sql.ErrNoRows {
- q := fmt.Sprintf("INSERT INTO %s (ty, insId, lastOffset) values ('%s', '%s', '%d')", btyTable, ty, insId, lastOffset)
- _, err = db.Exec(q)
- if err != nil {
- fmt.Println("getLastOffset", err)
- return lastOffset, err
- }
- return lastOffset, nil
- }
- return lastOffset, err
- }
- func updateLastoffset(typ, insId string, lastOffset int64) error {
- //INSERT INTO
- q := fmt.Sprintf("UPDATE %s set lastOffset = '%d' where ty = '%s' and insId = '%s'", btyTable, lastOffset, typ, insId)
- _, err := db.Exec(q)
- if err != nil {
- fmt.Println("updateLastoffset", err)
- }
- return err
- }
- func GetMacTick(instrumentId, ntime int64) error {
- //reader := NewTickRead()
- go func() {
- for {
- err, _ntime, _ := GetMacTickbyPage(1, ntime, instrumentId, func(mk *Market) {
- if mk != nil {
- bty_reader.ch <- mk
- }
- })
- time.Sleep(time.Second)
- //fmt.Println(err, ntime, isEndPage)
- if err != nil {
- //log.Println("GetMacTick", err)
- continue
- }
- if ntime < _ntime {
- symbol, _ := markinfo.SymbolName(int(instrumentId))
- err := updateLastoffset(Bty, symbol, _ntime)
- if err != nil {
- continue
- }
- //time.Sleep(time.Duration(total/100) * time.Second)
- ntime = _ntime
- }
- //if !isEndPage {
- //continue
- //}
- }
- }()
- return nil
- }
- func GetMacTickbyPage(page int, time2 int64, instrumentId int64, cb func(*Market)) (error, int64, int64) {
- url := "https://zc.bityuan.com/site/tick2?count=500&Tick_page=" + fmt.Sprintf("%d", page) + "&Tick_sort=match_id.asc&time=" + fmt.Sprintf("%d", time2)
- //fmt.Println(url)
- response, err := http.Get(url)
- if err != nil {
- //fmt.Println("macoin", err)
- return err, 0, 0
- }
- defer response.Body.Close()
- body, err := ioutil.ReadAll(response.Body)
- if err != nil {
- //fmt.Println(err)
- return err, 0, 0
- }
- var data map[string]interface{}
- err = json.Unmarshal(body, &data)
- if err != nil {
- //fmt.Println("json Unmarshal", err)
- return err, 0, 0
- }
- total, err := strconv.ParseInt(data["total"].(string), 10, 32)
- if err != nil {
- //fmt.Println("ParseInt total ", err)
- return err, 0, 0
- }
- dataticks := data["data"].([]interface{})
- ntime := int64(0)
- for _, tmp := range dataticks {
- mk, err := toMacMK(tmp.(map[string]interface{}), instrumentId)
- if mk == nil || err != nil {
- continue
- }
- ntime = (mk.Timestamp / 1000) + 1
- if mk.LastPrice == 0.0 || mk.LastVolume == 0.0 {
- continue
- }
- cb(mk)
- }
- return nil, ntime, total
- }
- func toMacMK(data map[string]interface{}, instrumentId int64) (*Market, error) {
- mk := &Market{}
- mk.InsId = instrumentId
- mk.Type = IntBty
- timestamp, err := strconv.ParseInt(data["time"].(string), 10, 32)
- if err != nil {
- return nil, err
- }
- match_id, err := strconv.ParseInt(data["match_id"].(string), 10, 32)
- if err != nil {
- return nil, err
- }
- if last_offset > int64(match_id) {
- return nil, nil
- }
- last_offset = int64(match_id)
- mk.Timestamp = timestamp * 1000
- // value
- var datavalue map[string]interface{}
- var datasell map[string]interface{}
- err = json.Unmarshal([]byte(data["value"].(string)), &datavalue)
- if err != nil {
- return nil, err
- }
- if len(datavalue["sell"].([]interface{})) == 0 {
- return mk, nil
- }
- datasell = (datavalue["sell"].([]interface{})[0]).(map[string]interface{})
- price, err := strconv.ParseFloat(datasell["price"].(string), 64)
- if err != nil {
- return nil, err
- }
- price /= 1000
- volume, err := strconv.ParseInt(data["amount"].(string), 10, 64)
- if err != nil {
- return nil, err
- }
- if volume < 0 {
- volume = -volume
- }
- mk.AllAmount = float64(volume)
- mk.AllVolume = float64(volume)
- mk.Close = price
- mk.Open = price
- mk.High = price
- mk.Low = price
- mk.LastPrice = price
- mk.LastVolume = float64(volume)
- var ask, bid PP
- ask[0] = price
- ask[1] = float64(volume)
- bid[0] = price
- bid[1] = float64(volume)
- mk.Asks = append(mk.Asks, ask)
- mk.Bids = append(mk.Bids, bid)
- return mk, nil
- }
|