ds_bty.go 20 KB


  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package tick
  3. // 本文件实现bityuan数据源接口, 实时数据和历史数据的获取和保存
  4. import (
  5. "database/sql"
  6. "encoding/json"
  7. //"errors"
  8. "fmt"
  9. "io/ioutil"
  10. //"log"
  11. //"bytes"
  12. "net/http"
  13. "strconv"
  14. "time"
  15. "tickserver/markinfo"
  16. "tickserver/server/market"
  17. _ "github.com/go-sql-driver/mysql"
  18. )
  19. var btyInss = []int{
  20. markinfo.ETHBTC,
  21. markinfo.ETCBTC,
  22. markinfo.ZECBTC,
  23. markinfo.LTCBTC,
  24. markinfo.BCCBTC,
  25. markinfo.ETHUSDT,
  26. markinfo.ETCUSDT,
  27. markinfo.ZECUSDT,
  28. markinfo.LTCUSDT,
  29. markinfo.BCCUSDT,
  30. markinfo.BTCUSDT,
  31. markinfo.BTYUSDT,
  32. markinfo.BTSUSDT,
  33. markinfo.SCUSDT,
  34. markinfo.DCRUSDT,
  35. markinfo.BNTUSDT,
  36. markinfo.BNTBTC,
  37. markinfo.SCTCUSDT,
  38. markinfo.SCTCBTC,
  39. markinfo.YCCUSDT,
  40. markinfo.YCCBTC,
  41. markinfo.YCCETH,
  42. markinfo.JBUSDT,
  43. markinfo.JBBTC,
  44. markinfo.JBETH,
  45. markinfo.OPTCUSDT,
  46. markinfo.OPTCBTC,
  47. markinfo.OPTCETH,
  48. markinfo.BTCETH,
  49. markinfo.BCCETH,
  50. markinfo.ZECETH,
  51. markinfo.ETCETH,
  52. markinfo.LTCETH,
  53. markinfo.STILT,
  54. markinfo.ITVBUSDT,
  55. markinfo.ITVBBTC,
  56. markinfo.ITVBETH,
  57. markinfo.BTYETH,
  58. markinfo.FHUSDT,
  59. markinfo.CWVUSDT,
  60. markinfo.FHETH,
  61. markinfo.TMCETH,
  62. markinfo.FANSUSDT,
  63. markinfo.FANSBTC,
  64. markinfo.FANSETH,
  65. markinfo.WTBWTC,
  66. markinfo.CNSUSDT,
  67. }
  68. var inss = map[int]string{
  69. markinfo.ETHBTC: "ETH_BTC",
  70. markinfo.ETCBTC: "ETC_BTC",
  71. markinfo.ZECBTC: "ZEC_BTC",
  72. markinfo.LTCBTC: "LTC_BTC",
  73. markinfo.BCCBTC: "BCC_BTC",
  74. markinfo.ETHUSDT: "ETH_USDT",
  75. markinfo.ETCUSDT: "ETC_USDT",
  76. markinfo.ZECUSDT: "ZEC_USDT",
  77. markinfo.LTCUSDT: "LTC_USDT",
  78. markinfo.BCCUSDT: "BCC_USDT",
  79. markinfo.BTCUSDT: "BTC_USDT",
  80. markinfo.BTYUSDT: "BTY_USDT",
  81. markinfo.BTSUSDT: "BTS_USDT",
  82. markinfo.SCUSDT: "SC_USDT",
  83. markinfo.DCRUSDT: "DCR_USDT",
  84. markinfo.BNTUSDT: "BNT_USDT",
  85. markinfo.BNTBTC: "BNT_BTC",
  86. markinfo.SCTCUSDT: "SCTC_USDT",
  87. markinfo.SCTCBTC: "SCTC_BTC",
  88. markinfo.YCCUSDT: "YCC_USDT",
  89. markinfo.YCCBTC: "YCC_BTC",
  90. markinfo.YCCETH: "YCC_ETH",
  91. markinfo.JBUSDT: "JB_USDT",
  92. markinfo.JBBTC: "JB_BTC",
  93. markinfo.JBETH: "JB_ETH",
  94. markinfo.OPTCUSDT: "OPTC_USDT",
  95. markinfo.OPTCBTC: "OPTC_BTC",
  96. markinfo.OPTCETH: "OPTC_ETH",
  97. markinfo.BTCETH: "BTC_ETH",
  98. markinfo.BCCETH: "BCC_ETH",
  99. markinfo.ZECETH: "ZEC_ETH",
  100. markinfo.ETCETH: "ETC_ETH",
  101. markinfo.LTCETH: "LTC_ETH",
  102. markinfo.STILT: "ST_ILT",
  103. markinfo.ITVBUSDT: "ITVB_USDT",
  104. markinfo.ITVBBTC: "ITVB_BTC",
  105. markinfo.ITVBETH: "ITVB_ETH",
  106. markinfo.BTYETH: "BTY_ETH",
  107. markinfo.FHUSDT: "FH_USDT",
  108. markinfo.CWVUSDT: "CWV_USDT",
  109. markinfo.FHETH: "FH_ETH",
  110. markinfo.TMCETH: "TMC_ETH",
  111. markinfo.FANSUSDT: "FANS_USDT",
  112. markinfo.FANSBTC: "FANS_BTC",
  113. markinfo.FANSETH: "FANS_ETH",
  114. markinfo.WTBWTC: "WTB_WTC",
  115. markinfo.CNSUSDT: "CNS_USDT",
  116. }
  117. var btyTable = "inss_lastoffsets"
  118. var last_offset int64
  119. var bty_reader *TickRead
  120. // BtyDS实现了dataSource接口, 并对bityuan的历史数据和实时数据保存
  121. type BtyDS struct {
  122. *DSBase
  123. conf *DsConf
  124. lastOffsets []int64
  125. }
  126. func init() {
  127. drivers[Bty] = newBtyDS
  128. }
  129. func newBtyDS(conf *DsConf) (DataSource, error) {
  130. err := checkBtyTable()
  131. if err != nil {
  132. return nil, err
  133. }
  134. bds := &BtyDS{
  135. DSBase: NewDsBase(conf),
  136. conf: conf,
  137. }
  138. bds.insMap = btyInsMap()
  139. bds.lastOffsets = make([]int64, len(btyInss))
  140. for i := 0; i < len(btyInss); i++ {
  141. symbol, _ := markinfo.SymbolName(btyInss[i])
  142. lastOffset, err := getLastOffset(Bty, symbol)
  143. if err != nil {
  144. return nil, err
  145. }
  146. bds.lastOffsets[i] = lastOffset
  147. }
  148. bty_reader = NewTickRead()
  149. return bds, nil
  150. }
  151. func (bds *BtyDS) Name() string {
  152. return Bty
  153. }
  154. func (bds *BtyDS) Run() {
  155. //log.Println("BtyDS.Run")
  156. for i, offset := range bds.lastOffsets {
  157. bds.getData(int64(btyInss[i]), offset)
  158. }
  159. for {
  160. mk, err := bty_reader.Read()
  161. if err != nil {
  162. fmt.Println("1", err)
  163. }
  164. //log.Println("save fuck")
  165. bds.Save(mk)
  166. }
  167. }
  168. func btyInsMap() map[int64]*Instrument {
  169. insMap := make(map[int64]*Instrument)
  170. for _, id := range btyInss {
  171. x, _ := markinfo.SymbolName(id)
  172. u, _ := markinfo.SymbolUint(x)
  173. ins := &Instrument{
  174. Id: int64(id),
  175. Name: x,
  176. ExId: Bty,
  177. Type: market.Btcs,
  178. PriceInc: u,
  179. StartTime: time.Now().Unix() * 1000,
  180. }
  181. insMap[int64(id)] = ins
  182. }
  183. return insMap
  184. }
  185. func (bds *BtyDS) getData(instrumentId, offset int64) {
  186. //var reader *TickRead
  187. if instrumentId == markinfo.BTYCNY {
  188. GetMacTick(instrumentId, offset)
  189. } else {
  190. GetEthTick(instrumentId, offset)
  191. }
  192. //if instrumentId == markinfo.BTCCNY {
  193. //GetBtcTick(instrumentId, offset)
  194. //}
  195. }
  196. func GetEthTickbyPage(instrumentId, offset int64, cb func(*Market)) (int64, error) {
  197. symbol, _ := markinfo.SymbolName(int(instrumentId))
  198. //url := "http://122.224.124.250:10115/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  199. //url := "http://10.0.1.5:45659/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  200. url := "http://10.0.1.177:80/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  201. response, err := http.Get(url)
  202. if err != nil {
  203. return offset, err
  204. }
  205. defer response.Body.Close()
  206. body, err := ioutil.ReadAll(response.Body)
  207. if err != nil {
  208. //fmt.Println("f2", url, err)
  209. return offset, err
  210. }
  211. var data map[string]interface{}
  212. err = json.Unmarshal(body, &data)
  213. if err != nil {
  214. //fmt.Println("f3", url, err)
  215. return offset, err
  216. }
  217. //fmt.Println(url)
  218. dataticks, ok := data["result"].([]interface{})
  219. if !ok {
  220. //fmt.Println("GetEthTickbyPage1", req, string(body))
  221. return offset, nil
  222. }
  223. total := len(dataticks)
  224. if total == 0 {
  225. //log.Println("GetEthTickbyPage2", url, string(body))
  226. }
  227. //offset += int64(total)
  228. for _, tmp := range dataticks {
  229. ti, id, err := toEthMK(tmp.(map[string]interface{}))
  230. if err != nil {
  231. fmt.Println("toMac=", err)
  232. continue
  233. }
  234. ti.InsId = instrumentId
  235. offset = id
  236. cb(ti)
  237. }
  238. return offset, nil
  239. }
  240. /*
  241. func GetEthTickbyPage(instrumentId, offset int64, cb func(*Market)) (int64, error) {
  242. symbol := inss[int(instrumentId)]
  243. //url := "http://122.224.124.250:10115/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  244. //url := "http://10.0.1.5:45659/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  245. //response, err := http.Get(url)
  246. //if err != nil {
  247. //return offset, err
  248. //}
  249. params := make(map[string]interface{})
  250. params["symbol"] = symbol
  251. params["offset"] = offset
  252. params["limit"] = 100
  253. req := make(map[string]interface{})
  254. req["jsonrpc"] = "2.0"
  255. req["method"] = "kline"
  256. req["id"] = 0
  257. req["params"] = params
  258. bytesData, err := json.Marshal(req)
  259. if err != nil {
  260. //fmt.Println("2", err)
  261. return offset, err
  262. }
  263. reader := bytes.NewReader(bytesData)
  264. url := "http://47.91.198.174:45656"
  265. request, err := http.NewRequest("POST", url, reader)
  266. if err != nil {
  267. //fmt.Println("3", err)
  268. return offset, err
  269. }
  270. request.Header.Set("Content-Type", "application/json;charset=UTF-8")
  271. client := http.Client{}
  272. response, err := client.Do(request)
  273. if err != nil {
  274. //fmt.Println("4", err)
  275. return offset, err
  276. }
  277. defer response.Body.Close()
  278. body, err := ioutil.ReadAll(response.Body)
  279. if err != nil {
  280. //fmt.Println("f2", url, err)
  281. return offset, err
  282. }
  283. var data map[string]interface{}
  284. err = json.Unmarshal(body, &data)
  285. if err != nil {
  286. //fmt.Println("f3", url, err)
  287. return offset, err
  288. }
  289. //fmt.Println(url)
  290. dataticks, ok := data["result"].([]interface{})
  291. if !ok {
  292. //fmt.Println("GetEthTickbyPage1", req, string(body))
  293. return offset, nil
  294. }
  295. total := len(dataticks)
  296. if total == 0 {
  297. //log.Println("GetEthTickbyPage2", url, string(body))
  298. }
  299. //offset += int64(total)
  300. for _, tmp := range dataticks {
  301. ti, id, err := toEthMK(tmp.(map[string]interface{}))
  302. if err != nil {
  303. fmt.Println("toMac=", err)
  304. continue
  305. }
  306. ti.InsId = instrumentId
  307. offset = id
  308. cb(ti)
  309. }
  310. return offset, nil
  311. }
  312. */
  313. func GetBtcTickbyPage(offset int64, cb func(*Market)) (int64, error) {
  314. url := "http://32.33.cn:9902/tender/default/kline?symbol=BTC&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  315. response, err := http.Get(url)
  316. if err != nil {
  317. //log.Println("k1", url, err)
  318. return offset, err
  319. }
  320. defer response.Body.Close()
  321. body, err := ioutil.ReadAll(response.Body)
  322. if err != nil {
  323. //log.Println("k2", url, err)
  324. return offset, err
  325. }
  326. var data map[string]interface{}
  327. err = json.Unmarshal(body, &data)
  328. if err != nil {
  329. //log.Println("k3", url, err)
  330. return offset, err
  331. }
  332. //fmt.Println(url)
  333. dataticks, ok := data["result"].([]interface{})
  334. if !ok {
  335. //log.Println("GetBtcTickbyPage1", url, string(body))
  336. return offset, nil
  337. }
  338. total := len(dataticks)
  339. if total == 0 {
  340. //log.Println("GetBtcTickbyPage2", url, string(body))
  341. }
  342. offset += int64(total)
  343. for _, tmp := range dataticks {
  344. ti, err := toBtcMK(tmp.(map[string]interface{}))
  345. if err != nil {
  346. fmt.Println("toBtc=", err)
  347. continue
  348. }
  349. cb(ti)
  350. }
  351. return offset, nil
  352. }
  353. func toEthMK(data map[string]interface{}) (*Market, int64, error) {
  354. mk := &Market{}
  355. mk.Type = IntBty
  356. mk.Timestamp = parseKTime(data["time"].(string))
  357. price := data["price"].(float64)
  358. volume, _ := data["quantity"].(float64)
  359. id := int64(data["id"].(float64))
  360. price /= (10000 * 10000)
  361. volume /= 100000000
  362. mk.Close = price
  363. mk.Open = price
  364. mk.High = price
  365. mk.Low = price
  366. mk.AllAmount = volume
  367. mk.AllVolume = volume
  368. mk.LastPrice = price
  369. mk.LastVolume = volume
  370. var ask, bid PP
  371. ask[0] = price
  372. ask[1] = volume
  373. bid[0] = price
  374. bid[1] = volume
  375. mk.Asks = append(mk.Asks, ask)
  376. mk.Bids = append(mk.Bids, bid)
  377. return mk, id, nil
  378. }
  379. //func toEthMK(data map[string]interface{}) (*Market, int64, error) {
  380. //mk := &Market{}
  381. //mk.Type = IntBty
  382. /*symbolId := int32(data["symbolid"].(float64))
  383. switch symbolId {
  384. case 0x50001:
  385. mk.InsId = markinfo.ETCCNY
  386. case 0x20001:
  387. mk.InsId = markinfo.BTCCNY
  388. case 0x40001:
  389. mk.InsId = markinfo.ETHCNY
  390. case 0x70001:
  391. mk.InsId = markinfo.SCCNY
  392. case 0x80001:
  393. mk.InsId = markinfo.ZECCNY
  394. case 0x90001:
  395. mk.InsId = markinfo.BTSCNY
  396. case 0xA0001:
  397. mk.InsId = markinfo.LTCCNY
  398. case 0xB0001:
  399. mk.InsId = markinfo.BCCCNY
  400. case 0xD0001:
  401. mk.InsId = markinfo.NYCCCNY
  402. case 0xE0001:
  403. mk.InsId = markinfo.WTCCNY
  404. case 0x40002:
  405. mk.InsId = markinfo.ETHBTC
  406. case 0x50002:
  407. mk.InsId = markinfo.ETCBTC
  408. case 0x80002:
  409. mk.InsId = markinfo.ZECBTC
  410. case 0xA0002:
  411. mk.InsId = markinfo.LTCBTC
  412. case 0xB0002:
  413. mk.InsId = markinfo.BCCBTC
  414. case 0x4000F:
  415. mk.InsId = markinfo.ETHUSDT
  416. case 0x5000F:
  417. mk.InsId = markinfo.ETCUSDT
  418. case 0x8000F:
  419. mk.InsId = markinfo.ZECUSDT
  420. case 0xA000F:
  421. mk.InsId = markinfo.LTCUSDT
  422. case 0xB000F:
  423. mk.InsId = markinfo.BCCUSDT
  424. case 0x2000F:
  425. mk.InsId = markinfo.BTCUSDT
  426. case 0x3000F:
  427. mk.InsId = markinfo.BTYUSDT
  428. case 0x9000F:
  429. mk.InsId = markinfo.BTSUSDT
  430. case 0x7000F:
  431. mk.InsId = markinfo.SCUSDT
  432. case 0x30002:
  433. mk.InsId = markinfo.BTYBTC
  434. case 0x90002:
  435. mk.InsId = markinfo.BTSBTC
  436. case 0x70002:
  437. mk.InsId = markinfo.SCBTC
  438. case 0xC000F:
  439. mk.InsId = markinfo.YCCUSDT
  440. case 0x11000F:
  441. mk.InsId = markinfo.DCRUSDT
  442. case 0x10000F:
  443. mk.InsId = markinfo.BNTUSDT
  444. case 0x100002:
  445. mk.InsId = markinfo.BNTBTC
  446. case 0x12000F:
  447. mk.InsId = markinfo.SCTCUSDT
  448. case 0x120002:
  449. mk.InsId = markinfo.SCTCBTC
  450. case 0xC0002:
  451. mk.InsId = markinfo.YCCBTC
  452. case 0xC0004:
  453. mk.InsId = markinfo.YCCETH
  454. case 0x15000F:
  455. mk.InsId = markinfo.JBUSDT
  456. case 0x150002:
  457. mk.InsId = markinfo.JBBTC
  458. case 0x150004:
  459. mk.InsId = markinfo.JBETH
  460. case 0x16000F:
  461. mk.InsId = markinfo.OPTCUSDT
  462. case 0x160002:
  463. mk.InsId = markinfo.OPTCBTC
  464. case 0x160004:
  465. mk.InsId = markinfo.OPTCETH
  466. case 0x20004:
  467. mk.InsId = markinfo.BTCETH
  468. case 0xB0004:
  469. mk.InsId = markinfo.BCCETH
  470. case 0x80004:
  471. mk.InsId = markinfo.ZECETH
  472. case 0x50004:
  473. mk.InsId = markinfo.ETCETH
  474. case 0xA0004:
  475. mk.InsId = markinfo.LTCETH
  476. case 0x130014:
  477. mk.InsId = markinfo.STILT
  478. case 0x17000F:
  479. mk.InsId = markinfo.ITVBUSDT
  480. case 0x170002:
  481. mk.InsId = markinfo.ITVBBTC
  482. case 0x170004:
  483. mk.InsId = markinfo.ITVBETH
  484. case 0x30004:
  485. mk.InsId = markinfo.BTYETH
  486. case 0x1A000F:
  487. mk.InsId = markinfo.FHUSDT
  488. case 0x1B000F:
  489. mk.InsId = markinfo.CWVUSDT
  490. case 0x1A0004:
  491. mk.InsId = markinfo.FHETH
  492. case 0x1C0004:
  493. mk.InsId = markinfo.TMCETH
  494. case 0x1D000F:
  495. mk.InsId = markinfo.FANSUSDT
  496. case 0x1D0002:
  497. mk.InsId = markinfo.FANSBTC
  498. case 0x1D0004:
  499. mk.InsId = markinfo.FANSETH
  500. case 0x1F0020:
  501. mk.InsId = markinfo.WTBWTC
  502. case 0x21000F:
  503. mk.InsId = markinfo.CNSUSDT
  504. default:
  505. return nil, 0, errors.New("invalid symbolid")
  506. }
  507. mk.Timestamp = parseKTime(data["time"].(string))
  508. price := data["price"].(float64)
  509. volume, _ := data["quantity"].(float64)
  510. id := int64(data["id"].(float64))
  511. price /= (10000 * 10000)
  512. volume /= 100000000
  513. mk.Close = price
  514. mk.Open = price
  515. mk.High = price
  516. mk.Low = price
  517. mk.AllAmount = volume
  518. mk.AllVolume = volume
  519. mk.LastPrice = price
  520. mk.LastVolume = volume
  521. var ask, bid PP
  522. ask[0] = price
  523. ask[1] = volume
  524. bid[0] = price
  525. bid[1] = volume
  526. mk.Asks = append(mk.Asks, ask)
  527. mk.Bids = append(mk.Bids, bid)*/
  528. //mk.Timestamp = int64(data["time"].(float64)) * 1000
  529. //price := data["price"].(float64)
  530. //volume, _ := data["amount"].(float64)
  531. //id := int64(data["id"].(float64))
  532. //price /= (10000 * 10000)
  533. //volume /= 100000000
  534. //mk.Close = price
  535. //mk.Open = price
  536. //mk.High = price
  537. //mk.Low = price
  538. //mk.AllAmount = volume
  539. //mk.AllVolume = volume
  540. //mk.LastPrice = price
  541. //mk.LastVolume = volume
  542. //var ask, bid PP
  543. //ask[0] = price
  544. //ask[1] = volume
  545. //bid[0] = price
  546. //bid[1] = volume
  547. //mk.Asks = append(mk.Asks, ask)
  548. //mk.Bids = append(mk.Bids, bid)
  549. //return mk, id, nil
  550. //}
  551. func parseKTime(timeStr string) int64 {
  552. loc, _ := time.LoadLocation("Asia/Chongqing")
  553. var year, month, day, hour, minute, second int
  554. fmt.Sscanf(timeStr, "%04d-%02d-%02d %02d:%02d:%02d", &year, &month, &day, &hour, &minute, &second)
  555. t := time.Date(year, time.Month(month), day, hour, minute, second, 0, loc)
  556. return t.Unix() * 1000
  557. }
  558. func toBtcMK(data map[string]interface{}) (*Market, error) {
  559. mk := &Market{}
  560. mk.Type = IntBty
  561. mk.InsId = markinfo.BTCCNY
  562. tick_time := data["time"]
  563. f_tick_time, _ := strconv.ParseFloat(tick_time.(string), 32)
  564. mk.Timestamp = int64(f_tick_time / (1e6))
  565. price := data["price"].(float64)
  566. volume := data["quantity"].(float64)
  567. price /= (100 * 10000)
  568. //volume /= 100
  569. mk.Close = price
  570. mk.Open = price
  571. mk.High = price
  572. mk.Low = price
  573. mk.AllAmount = volume
  574. mk.AllVolume = volume
  575. mk.LastPrice = price
  576. mk.LastVolume = volume
  577. var ask, bid PP
  578. ask[0] = price
  579. ask[1] = volume
  580. bid[0] = price
  581. bid[1] = volume
  582. mk.Asks = append(mk.Asks, ask)
  583. mk.Bids = append(mk.Bids, bid)
  584. return mk, nil
  585. }
  586. type TickRead struct {
  587. ch chan *Market
  588. err chan error
  589. }
  590. func NewTickRead() *TickRead {
  591. ch := make(chan *Market, 1024)
  592. errch := make(chan error)
  593. reader := &TickRead{}
  594. reader.ch = ch
  595. reader.err = errch
  596. return reader
  597. }
  598. func (tr *TickRead) Read() (*Market, error) {
  599. tick := <-tr.ch
  600. if tick == nil {
  601. return nil, <-tr.err
  602. }
  603. return tick, nil
  604. }
  605. func GetEthTick(instrumentId, offset int64) error {
  606. //reader := NewTickRead()
  607. lasttime := int64(0)
  608. go func() {
  609. for {
  610. offset_next, err := GetEthTickbyPage(instrumentId, offset, func(mk *Market) {
  611. if mk != nil {
  612. if mk.Timestamp >= lasttime {
  613. bty_reader.ch <- mk
  614. lasttime = mk.Timestamp
  615. }
  616. }
  617. })
  618. time.Sleep(time.Second)
  619. if err != nil {
  620. //log.Println("GetEthTick", err, offset)
  621. continue
  622. }
  623. if offset < offset_next {
  624. symbol, _ := markinfo.SymbolName(int(instrumentId))
  625. err := updateLastoffset(Bty, symbol, offset_next)
  626. if err != nil {
  627. continue
  628. }
  629. //time.Sleep(time.Duration((offset_next-offset)/100) * time.Second)
  630. offset = offset_next
  631. }
  632. //if !isEndPage {
  633. //continue
  634. //}
  635. }
  636. }()
  637. return nil
  638. }
  639. func GetBtcTick(instrumentId, offset int64) error {
  640. //reader := NewTickRead()
  641. lasttime := int64(0)
  642. go func() {
  643. for {
  644. //offset, _ = getLastOffset(Bty, "BTCCNY")
  645. offset_next, err := GetBtcTickbyPage(offset, func(mk *Market) {
  646. if mk != nil {
  647. if mk.Timestamp >= lasttime {
  648. bty_reader.ch <- mk
  649. lasttime = mk.Timestamp
  650. }
  651. }
  652. })
  653. time.Sleep(time.Second)
  654. if err != nil {
  655. //log.Println("GetBtcTick", err, offset)
  656. continue
  657. }
  658. if offset < offset_next {
  659. symbol, _ := markinfo.SymbolName(int(instrumentId))
  660. err := updateLastoffset(Bty, symbol, offset_next)
  661. if err != nil {
  662. continue
  663. }
  664. //time.Sleep(time.Duration((offset_next-offset)/100) * time.Second)
  665. offset = offset_next
  666. }
  667. //if !isEndPage {
  668. //continue
  669. //}
  670. }
  671. }()
  672. return nil
  673. }
  674. /*创建数据表*/
  675. func checkBtyTable() error {
  676. sql := fmt.Sprintf("create table if not exists %s (ty varchar(10), insId varchar(20), lastOffset bigint)", btyTable)
  677. _, err := db.Exec(sql)
  678. if err != nil {
  679. return err
  680. }
  681. return nil
  682. }
  683. func getLastOffset(ty, insId string) (lastOffset int64, err error) {
  684. szSelectTable := "SELECT `lastOffset` FROM `" + btyTable + "` WHERE `ty` = '" + ty + "' AND `insId` = '" + insId + "';"
  685. row := db.QueryRow(szSelectTable)
  686. err = row.Scan(&lastOffset)
  687. if err == sql.ErrNoRows {
  688. q := fmt.Sprintf("INSERT INTO %s (ty, insId, lastOffset) values ('%s', '%s', '%d')", btyTable, ty, insId, lastOffset)
  689. _, err = db.Exec(q)
  690. if err != nil {
  691. fmt.Println("getLastOffset", err)
  692. return lastOffset, err
  693. }
  694. return lastOffset, nil
  695. }
  696. return lastOffset, err
  697. }
  698. func updateLastoffset(typ, insId string, lastOffset int64) error {
  699. //INSERT INTO
  700. q := fmt.Sprintf("UPDATE %s set lastOffset = '%d' where ty = '%s' and insId = '%s'", btyTable, lastOffset, typ, insId)
  701. _, err := db.Exec(q)
  702. if err != nil {
  703. fmt.Println("updateLastoffset", err)
  704. }
  705. return err
  706. }
  707. func GetMacTick(instrumentId, ntime int64) error {
  708. //reader := NewTickRead()
  709. go func() {
  710. for {
  711. err, _ntime, _ := GetMacTickbyPage(1, ntime, instrumentId, func(mk *Market) {
  712. if mk != nil {
  713. bty_reader.ch <- mk
  714. }
  715. })
  716. time.Sleep(time.Second)
  717. //fmt.Println(err, ntime, isEndPage)
  718. if err != nil {
  719. //log.Println("GetMacTick", err)
  720. continue
  721. }
  722. if ntime < _ntime {
  723. symbol, _ := markinfo.SymbolName(int(instrumentId))
  724. err := updateLastoffset(Bty, symbol, _ntime)
  725. if err != nil {
  726. continue
  727. }
  728. //time.Sleep(time.Duration(total/100) * time.Second)
  729. ntime = _ntime
  730. }
  731. //if !isEndPage {
  732. //continue
  733. //}
  734. }
  735. }()
  736. return nil
  737. }
  738. func GetMacTickbyPage(page int, time2 int64, instrumentId int64, cb func(*Market)) (error, int64, int64) {
  739. url := "https://zc.bityuan.com/site/tick2?count=500&Tick_page=" + fmt.Sprintf("%d", page) + "&Tick_sort=match_id.asc&time=" + fmt.Sprintf("%d", time2)
  740. //fmt.Println(url)
  741. response, err := http.Get(url)
  742. if err != nil {
  743. //fmt.Println("macoin", err)
  744. return err, 0, 0
  745. }
  746. defer response.Body.Close()
  747. body, err := ioutil.ReadAll(response.Body)
  748. if err != nil {
  749. //fmt.Println(err)
  750. return err, 0, 0
  751. }
  752. var data map[string]interface{}
  753. err = json.Unmarshal(body, &data)
  754. if err != nil {
  755. //fmt.Println("json Unmarshal", err)
  756. return err, 0, 0
  757. }
  758. total, err := strconv.ParseInt(data["total"].(string), 10, 32)
  759. if err != nil {
  760. //fmt.Println("ParseInt total ", err)
  761. return err, 0, 0
  762. }
  763. dataticks := data["data"].([]interface{})
  764. ntime := int64(0)
  765. for _, tmp := range dataticks {
  766. mk, err := toMacMK(tmp.(map[string]interface{}), instrumentId)
  767. if mk == nil || err != nil {
  768. continue
  769. }
  770. ntime = (mk.Timestamp / 1000) + 1
  771. if mk.LastPrice == 0.0 || mk.LastVolume == 0.0 {
  772. continue
  773. }
  774. cb(mk)
  775. }
  776. return nil, ntime, total
  777. }
  778. func toMacMK(data map[string]interface{}, instrumentId int64) (*Market, error) {
  779. mk := &Market{}
  780. mk.InsId = instrumentId
  781. mk.Type = IntBty
  782. timestamp, err := strconv.ParseInt(data["time"].(string), 10, 32)
  783. if err != nil {
  784. return nil, err
  785. }
  786. match_id, err := strconv.ParseInt(data["match_id"].(string), 10, 32)
  787. if err != nil {
  788. return nil, err
  789. }
  790. if last_offset > int64(match_id) {
  791. return nil, nil
  792. }
  793. last_offset = int64(match_id)
  794. mk.Timestamp = timestamp * 1000
  795. // value
  796. var datavalue map[string]interface{}
  797. var datasell map[string]interface{}
  798. err = json.Unmarshal([]byte(data["value"].(string)), &datavalue)
  799. if err != nil {
  800. return nil, err
  801. }
  802. if len(datavalue["sell"].([]interface{})) == 0 {
  803. return mk, nil
  804. }
  805. datasell = (datavalue["sell"].([]interface{})[0]).(map[string]interface{})
  806. price, err := strconv.ParseFloat(datasell["price"].(string), 64)
  807. if err != nil {
  808. return nil, err
  809. }
  810. price /= 1000
  811. volume, err := strconv.ParseInt(data["amount"].(string), 10, 64)
  812. if err != nil {
  813. return nil, err
  814. }
  815. if volume < 0 {
  816. volume = -volume
  817. }
  818. mk.AllAmount = float64(volume)
  819. mk.AllVolume = float64(volume)
  820. mk.Close = price
  821. mk.Open = price
  822. mk.High = price
  823. mk.Low = price
  824. mk.LastPrice = price
  825. mk.LastVolume = float64(volume)
  826. var ask, bid PP
  827. ask[0] = price
  828. ask[1] = float64(volume)
  829. bid[0] = price
  830. bid[1] = float64(volume)
  831. mk.Asks = append(mk.Asks, ask)
  832. mk.Bids = append(mk.Bids, bid)
  833. return mk, nil
  834. }