ds_bty.go 14 KB


  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package tick
  3. // 本文件实现bityuan数据源接口, 实时数据和历史数据的获取和保存
  4. import (
  5. "database/sql"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io/ioutil"
  10. //"log"
  11. "net/http"
  12. "strconv"
  13. "time"
  14. "tickserver/markinfo"
  15. "tickserver/server/market"
  16. _ "github.com/go-sql-driver/mysql"
  17. )
  18. var btyInss = []int{
  19. //markinfo.SCCNY,
  20. //markinfo.BTCCNY,
  21. //markinfo.BTYCNY,
  22. //markinfo.ETHCNY,
  23. //markinfo.ETCCNY,
  24. //markinfo.ZECCNY,
  25. //markinfo.BTSCNY,
  26. //markinfo.LTCCNY,
  27. //markinfo.BCCCNY,
  28. //markinfo.NYCCCNY,
  29. //markinfo.WTCCNY,
  30. markinfo.ETHBTC,
  31. markinfo.ETCBTC,
  32. markinfo.ZECBTC,
  33. markinfo.LTCBTC,
  34. markinfo.BCCBTC,
  35. markinfo.ETHUSDT,
  36. markinfo.ETCUSDT,
  37. markinfo.ZECUSDT,
  38. markinfo.LTCUSDT,
  39. markinfo.BCCUSDT,
  40. markinfo.BTCUSDT,
  41. markinfo.BTYUSDT,
  42. markinfo.BTSUSDT,
  43. markinfo.SCUSDT,
  44. markinfo.BTYBTC,
  45. markinfo.BTSBTC,
  46. markinfo.SCBTC,
  47. markinfo.YCCUSDT,
  48. markinfo.BTCSUSDT,
  49. markinfo.DCRUSDT,
  50. }
  51. var btyTable = "inss_lastoffsets"
  52. var last_offset int64
  53. var bty_reader *TickRead
  54. // BtyDS实现了dataSource接口, 并对bityuan的历史数据和实时数据保存
  55. type BtyDS struct {
  56. *DSBase
  57. conf *DsConf
  58. lastOffsets []int64
  59. }
  60. func init() {
  61. drivers[Bty] = newBtyDS
  62. }
  63. func newBtyDS(conf *DsConf) (DataSource, error) {
  64. err := checkBtyTable()
  65. if err != nil {
  66. return nil, err
  67. }
  68. bds := &BtyDS{
  69. DSBase: NewDsBase(conf),
  70. conf: conf,
  71. }
  72. bds.insMap = btyInsMap()
  73. bds.lastOffsets = make([]int64, len(btyInss))
  74. for i := 0; i < len(btyInss); i++ {
  75. symbol, _ := markinfo.SymbolName(btyInss[i])
  76. lastOffset, err := getLastOffset(Bty, symbol)
  77. if err != nil {
  78. return nil, err
  79. }
  80. bds.lastOffsets[i] = lastOffset
  81. }
  82. bty_reader = NewTickRead()
  83. return bds, nil
  84. }
  85. func (bds *BtyDS) Name() string {
  86. return Bty
  87. }
  88. func (bds *BtyDS) Run() {
  89. //log.Println("BtyDS.Run")
  90. for i, offset := range bds.lastOffsets {
  91. bds.getData(int64(btyInss[i]), offset)
  92. }
  93. for {
  94. mk, err := bty_reader.Read()
  95. if err != nil {
  96. fmt.Println(err)
  97. }
  98. //log.Println("save fuck")
  99. bds.Save(mk)
  100. }
  101. }
  102. func btyInsMap() map[int64]*Instrument {
  103. insMap := make(map[int64]*Instrument)
  104. for _, id := range btyInss {
  105. x, _ := markinfo.SymbolName(id)
  106. u, _ := markinfo.SymbolUint(x)
  107. ins := &Instrument{
  108. Id: int64(id),
  109. Name: x,
  110. ExId: Bty,
  111. Type: market.Btcs,
  112. PriceInc: u,
  113. StartTime: time.Now().Unix() * 1000,
  114. }
  115. insMap[int64(id)] = ins
  116. }
  117. return insMap
  118. }
  119. func (bds *BtyDS) getData(instrumentId, offset int64) {
  120. //var reader *TickRead
  121. if instrumentId == markinfo.BTYCNY {
  122. GetMacTick(instrumentId, offset)
  123. } else {
  124. GetEthTick(instrumentId, offset)
  125. }
  126. //if instrumentId == markinfo.BTCCNY {
  127. //GetBtcTick(instrumentId, offset)
  128. //}
  129. }
  130. func GetEthTickbyPage(instrumentId, offset int64, cb func(*Market)) (int64, error) {
  131. symbol, _ := markinfo.SymbolName(int(instrumentId))
  132. //url := "http://47.75.62.253:46658/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  133. //url := "http://47.74.9.155:45656/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  134. url := "http://10.0.1.5:45656/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  135. //url := "http://121.196.205.182:45656/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  136. response, err := http.Get(url)
  137. if err != nil {
  138. //log.Println("f1", url, err)
  139. return offset, err
  140. }
  141. defer response.Body.Close()
  142. body, err := ioutil.ReadAll(response.Body)
  143. if err != nil {
  144. //log.Println("f2", url, err)
  145. return offset, err
  146. }
  147. var data map[string]interface{}
  148. err = json.Unmarshal(body, &data)
  149. if err != nil {
  150. //log.Println("f3", url, err)
  151. return offset, err
  152. }
  153. //fmt.Println(url)
  154. dataticks, ok := data["result"].([]interface{})
  155. if !ok {
  156. //log.Println("GetEthTickbyPage1", url, string(body))
  157. return offset, nil
  158. }
  159. total := len(dataticks)
  160. if total == 0 {
  161. //log.Println("GetEthTickbyPage2", url, string(body))
  162. }
  163. //offset += int64(total)
  164. for _, tmp := range dataticks {
  165. ti, id, err := toEthMK(tmp.(map[string]interface{}))
  166. if err != nil {
  167. fmt.Println("toMac=", err)
  168. continue
  169. }
  170. offset = id
  171. cb(ti)
  172. }
  173. return offset, nil
  174. }
  175. func GetBtcTickbyPage(offset int64, cb func(*Market)) (int64, error) {
  176. url := "http://32.33.cn:9902/tender/default/kline?symbol=BTC&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  177. response, err := http.Get(url)
  178. if err != nil {
  179. //log.Println("k1", url, err)
  180. return offset, err
  181. }
  182. defer response.Body.Close()
  183. body, err := ioutil.ReadAll(response.Body)
  184. if err != nil {
  185. //log.Println("k2", url, err)
  186. return offset, err
  187. }
  188. var data map[string]interface{}
  189. err = json.Unmarshal(body, &data)
  190. if err != nil {
  191. //log.Println("k3", url, err)
  192. return offset, err
  193. }
  194. //fmt.Println(url)
  195. dataticks, ok := data["result"].([]interface{})
  196. if !ok {
  197. //log.Println("GetBtcTickbyPage1", url, string(body))
  198. return offset, nil
  199. }
  200. total := len(dataticks)
  201. if total == 0 {
  202. //log.Println("GetBtcTickbyPage2", url, string(body))
  203. }
  204. offset += int64(total)
  205. for _, tmp := range dataticks {
  206. ti, err := toBtcMK(tmp.(map[string]interface{}))
  207. if err != nil {
  208. fmt.Println("toBtc=", err)
  209. continue
  210. }
  211. cb(ti)
  212. }
  213. return offset, nil
  214. }
  215. func toEthMK(data map[string]interface{}) (*Market, int64, error) {
  216. mk := &Market{}
  217. mk.Type = IntBty
  218. symbolId := int32(data["symbolid"].(float64))
  219. switch symbolId {
  220. case 327681:
  221. mk.InsId = markinfo.ETCCNY
  222. case 131073:
  223. mk.InsId = markinfo.BTCCNY
  224. case 262145:
  225. mk.InsId = markinfo.ETHCNY
  226. case 458753:
  227. mk.InsId = markinfo.SCCNY
  228. case 524289:
  229. mk.InsId = markinfo.ZECCNY
  230. case 589825:
  231. mk.InsId = markinfo.BTSCNY
  232. case 655361:
  233. mk.InsId = markinfo.LTCCNY
  234. case 720897:
  235. mk.InsId = markinfo.BCCCNY
  236. case 851969:
  237. mk.InsId = markinfo.NYCCCNY
  238. case 917505:
  239. mk.InsId = markinfo.WTCCNY
  240. case 262146:
  241. mk.InsId = markinfo.ETHBTC
  242. case 327682:
  243. mk.InsId = markinfo.ETCBTC
  244. case 524290:
  245. mk.InsId = markinfo.ZECBTC
  246. case 655362:
  247. mk.InsId = markinfo.LTCBTC
  248. case 720898:
  249. mk.InsId = markinfo.BCCBTC
  250. case 262159:
  251. mk.InsId = markinfo.ETHUSDT
  252. case 327695:
  253. mk.InsId = markinfo.ETCUSDT
  254. case 524303:
  255. mk.InsId = markinfo.ZECUSDT
  256. case 655375:
  257. mk.InsId = markinfo.LTCUSDT
  258. case 720911:
  259. mk.InsId = markinfo.BCCUSDT
  260. case 131087:
  261. mk.InsId = markinfo.BTCUSDT
  262. case 196623:
  263. mk.InsId = markinfo.BTYUSDT
  264. case 589839:
  265. mk.InsId = markinfo.BTSUSDT
  266. case 458767:
  267. mk.InsId = markinfo.SCUSDT
  268. case 196610:
  269. mk.InsId = markinfo.BTYBTC
  270. case 589826:
  271. mk.InsId = markinfo.BTSBTC
  272. case 458754:
  273. mk.InsId = markinfo.SCBTC
  274. case 786447:
  275. mk.InsId = markinfo.YCCUSDT
  276. case 1048591:
  277. mk.InsId = markinfo.BTCSUSDT
  278. case 1114127:
  279. mk.InsId = markinfo.DCRUSDT
  280. default:
  281. return nil, 0, errors.New("invalid symbolid")
  282. }
  283. mk.Timestamp = parseKTime(data["time"].(string))
  284. price := data["price"].(float64)
  285. volume, _ := data["quantity"].(float64)
  286. id := int64(data["id"].(float64))
  287. price /= (10000 * 10000)
  288. volume /= 100000000
  289. mk.Close = price
  290. mk.Open = price
  291. mk.High = price
  292. mk.Low = price
  293. mk.AllAmount = volume
  294. mk.AllVolume = volume
  295. mk.LastPrice = price
  296. mk.LastVolume = volume
  297. var ask, bid PP
  298. ask[0] = price
  299. ask[1] = volume
  300. bid[0] = price
  301. bid[1] = volume
  302. mk.Asks = append(mk.Asks, ask)
  303. mk.Bids = append(mk.Bids, bid)
  304. return mk, id, nil
  305. }
  306. func parseKTime(timeStr string) int64 {
  307. loc, _ := time.LoadLocation("Asia/Chongqing")
  308. var year, month, day, hour, minute, second int
  309. fmt.Sscanf(timeStr, "%04d-%02d-%02d %02d:%02d:%02d", &year, &month, &day, &hour, &minute, &second)
  310. t := time.Date(year, time.Month(month), day, hour, minute, second, 0, loc)
  311. return t.Unix() * 1000
  312. }
  313. func toBtcMK(data map[string]interface{}) (*Market, error) {
  314. mk := &Market{}
  315. mk.Type = IntBty
  316. mk.InsId = markinfo.BTCCNY
  317. tick_time := data["time"]
  318. f_tick_time, _ := strconv.ParseFloat(tick_time.(string), 32)
  319. mk.Timestamp = int64(f_tick_time / (1e6))
  320. price := data["price"].(float64)
  321. volume := data["quantity"].(float64)
  322. price /= (100 * 10000)
  323. //volume /= 100
  324. mk.Close = price
  325. mk.Open = price
  326. mk.High = price
  327. mk.Low = price
  328. mk.AllAmount = volume
  329. mk.AllVolume = volume
  330. mk.LastPrice = price
  331. mk.LastVolume = volume
  332. var ask, bid PP
  333. ask[0] = price
  334. ask[1] = volume
  335. bid[0] = price
  336. bid[1] = volume
  337. mk.Asks = append(mk.Asks, ask)
  338. mk.Bids = append(mk.Bids, bid)
  339. return mk, nil
  340. }
  341. type TickRead struct {
  342. ch chan *Market
  343. err chan error
  344. }
  345. func NewTickRead() *TickRead {
  346. ch := make(chan *Market, 1024)
  347. errch := make(chan error)
  348. reader := &TickRead{}
  349. reader.ch = ch
  350. reader.err = errch
  351. return reader
  352. }
  353. func (tr *TickRead) Read() (*Market, error) {
  354. tick := <-tr.ch
  355. if tick == nil {
  356. return nil, <-tr.err
  357. }
  358. return tick, nil
  359. }
  360. func GetEthTick(instrumentId, offset int64) error {
  361. //reader := NewTickRead()
  362. lasttime := int64(0)
  363. go func() {
  364. for {
  365. offset_next, err := GetEthTickbyPage(instrumentId, offset, func(mk *Market) {
  366. if mk != nil {
  367. if mk.Timestamp >= lasttime {
  368. bty_reader.ch <- mk
  369. lasttime = mk.Timestamp
  370. }
  371. }
  372. })
  373. time.Sleep(time.Second)
  374. if err != nil {
  375. //log.Println("GetEthTick", err, offset)
  376. continue
  377. }
  378. if offset < offset_next {
  379. symbol, _ := markinfo.SymbolName(int(instrumentId))
  380. err := updateLastoffset(Bty, symbol, offset_next)
  381. if err != nil {
  382. continue
  383. }
  384. //time.Sleep(time.Duration((offset_next-offset)/100) * time.Second)
  385. offset = offset_next
  386. }
  387. //if !isEndPage {
  388. //continue
  389. //}
  390. }
  391. }()
  392. return nil
  393. }
  394. func GetBtcTick(instrumentId, offset int64) error {
  395. //reader := NewTickRead()
  396. lasttime := int64(0)
  397. go func() {
  398. for {
  399. //offset, _ = getLastOffset(Bty, "BTCCNY")
  400. offset_next, err := GetBtcTickbyPage(offset, func(mk *Market) {
  401. if mk != nil {
  402. if mk.Timestamp >= lasttime {
  403. bty_reader.ch <- mk
  404. lasttime = mk.Timestamp
  405. }
  406. }
  407. })
  408. time.Sleep(time.Second)
  409. if err != nil {
  410. //log.Println("GetBtcTick", err, offset)
  411. continue
  412. }
  413. if offset < offset_next {
  414. symbol, _ := markinfo.SymbolName(int(instrumentId))
  415. err := updateLastoffset(Bty, symbol, offset_next)
  416. if err != nil {
  417. continue
  418. }
  419. //time.Sleep(time.Duration((offset_next-offset)/100) * time.Second)
  420. offset = offset_next
  421. }
  422. //if !isEndPage {
  423. //continue
  424. //}
  425. }
  426. }()
  427. return nil
  428. }
  429. /*创建数据表*/
  430. func checkBtyTable() error {
  431. sql := fmt.Sprintf("create table if not exists %s (ty varchar(10), insId varchar(20), lastOffset bigint)", btyTable)
  432. _, err := db.Exec(sql)
  433. if err != nil {
  434. return err
  435. }
  436. return nil
  437. }
  438. func getLastOffset(ty, insId string) (lastOffset int64, err error) {
  439. szSelectTable := "SELECT `lastOffset` FROM `" + btyTable + "` WHERE `ty` = '" + ty + "' AND `insId` = '" + insId + "';"
  440. row := db.QueryRow(szSelectTable)
  441. err = row.Scan(&lastOffset)
  442. if err == sql.ErrNoRows {
  443. q := fmt.Sprintf("INSERT INTO %s (ty, insId, lastOffset) values ('%s', '%s', '%d')", btyTable, ty, insId, lastOffset)
  444. _, err = db.Exec(q)
  445. if err != nil {
  446. fmt.Println("getLastOffset", err)
  447. return lastOffset, err
  448. }
  449. return lastOffset, nil
  450. }
  451. return lastOffset, err
  452. }
  453. func updateLastoffset(typ, insId string, lastOffset int64) error {
  454. //INSERT INTO
  455. q := fmt.Sprintf("UPDATE %s set lastOffset = '%d' where ty = '%s' and insId = '%s'", btyTable, lastOffset, typ, insId)
  456. _, err := db.Exec(q)
  457. if err != nil {
  458. fmt.Println("updateLastoffset", err)
  459. }
  460. return err
  461. }
  462. func GetMacTick(instrumentId, ntime int64) error {
  463. //reader := NewTickRead()
  464. go func() {
  465. for {
  466. err, _ntime, _ := GetMacTickbyPage(1, ntime, instrumentId, func(mk *Market) {
  467. if mk != nil {
  468. bty_reader.ch <- mk
  469. }
  470. })
  471. time.Sleep(time.Second)
  472. //fmt.Println(err, ntime, isEndPage)
  473. if err != nil {
  474. //log.Println("GetMacTick", err)
  475. continue
  476. }
  477. if ntime < _ntime {
  478. symbol, _ := markinfo.SymbolName(int(instrumentId))
  479. err := updateLastoffset(Bty, symbol, _ntime)
  480. if err != nil {
  481. continue
  482. }
  483. //time.Sleep(time.Duration(total/100) * time.Second)
  484. ntime = _ntime
  485. }
  486. //if !isEndPage {
  487. //continue
  488. //}
  489. }
  490. }()
  491. return nil
  492. }
  493. func GetMacTickbyPage(page int, time2 int64, instrumentId int64, cb func(*Market)) (error, int64, int64) {
  494. url := "https://zc.bityuan.com/site/tick2?count=500&Tick_page=" + fmt.Sprintf("%d", page) + "&Tick_sort=match_id.asc&time=" + fmt.Sprintf("%d", time2)
  495. //fmt.Println(url)
  496. response, err := http.Get(url)
  497. if err != nil {
  498. //fmt.Println("macoin", err)
  499. return err, 0, 0
  500. }
  501. defer response.Body.Close()
  502. body, err := ioutil.ReadAll(response.Body)
  503. if err != nil {
  504. //fmt.Println(err)
  505. return err, 0, 0
  506. }
  507. var data map[string]interface{}
  508. err = json.Unmarshal(body, &data)
  509. if err != nil {
  510. //fmt.Println("json Unmarshal", err)
  511. return err, 0, 0
  512. }
  513. total, err := strconv.ParseInt(data["total"].(string), 10, 32)
  514. if err != nil {
  515. //fmt.Println("ParseInt total ", err)
  516. return err, 0, 0
  517. }
  518. dataticks := data["data"].([]interface{})
  519. ntime := int64(0)
  520. for _, tmp := range dataticks {
  521. mk, err := toMacMK(tmp.(map[string]interface{}), instrumentId)
  522. if mk == nil || err != nil {
  523. continue
  524. }
  525. ntime = (mk.Timestamp / 1000) + 1
  526. if mk.LastPrice == 0.0 || mk.LastVolume == 0.0 {
  527. continue
  528. }
  529. cb(mk)
  530. }
  531. return nil, ntime, total
  532. }
  533. func toMacMK(data map[string]interface{}, instrumentId int64) (*Market, error) {
  534. mk := &Market{}
  535. mk.InsId = instrumentId
  536. mk.Type = IntBty
  537. timestamp, err := strconv.ParseInt(data["time"].(string), 10, 32)
  538. if err != nil {
  539. return nil, err
  540. }
  541. match_id, err := strconv.ParseInt(data["match_id"].(string), 10, 32)
  542. if err != nil {
  543. return nil, err
  544. }
  545. if last_offset > int64(match_id) {
  546. return nil, nil
  547. }
  548. last_offset = int64(match_id)
  549. mk.Timestamp = timestamp * 1000
  550. // value
  551. var datavalue map[string]interface{}
  552. var datasell map[string]interface{}
  553. err = json.Unmarshal([]byte(data["value"].(string)), &datavalue)
  554. if err != nil {
  555. return nil, err
  556. }
  557. if len(datavalue["sell"].([]interface{})) == 0 {
  558. return mk, nil
  559. }
  560. datasell = (datavalue["sell"].([]interface{})[0]).(map[string]interface{})
  561. price, err := strconv.ParseFloat(datasell["price"].(string), 64)
  562. if err != nil {
  563. return nil, err
  564. }
  565. price /= 1000
  566. volume, err := strconv.ParseInt(data["amount"].(string), 10, 64)
  567. if err != nil {
  568. return nil, err
  569. }
  570. if volume < 0 {
  571. volume = -volume
  572. }
  573. mk.AllAmount = float64(volume)
  574. mk.AllVolume = float64(volume)
  575. mk.Close = price
  576. mk.Open = price
  577. mk.High = price
  578. mk.Low = price
  579. mk.LastPrice = price
  580. mk.LastVolume = float64(volume)
  581. var ask, bid PP
  582. ask[0] = price
  583. ask[1] = float64(volume)
  584. bid[0] = price
  585. bid[1] = float64(volume)
  586. mk.Asks = append(mk.Asks, ask)
  587. mk.Bids = append(mk.Bids, bid)
  588. return mk, nil
  589. }