ds_bty.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624
  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package tick
  3. // 本文件实现bityuan数据源接口, 实时数据和历史数据的获取和保存
  4. import (
  5. "database/sql"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io/ioutil"
  10. //"log"
  11. "net/http"
  12. "strconv"
  13. "time"
  14. "tickserver/markinfo"
  15. "tickserver/server/market"
  16. _ "github.com/go-sql-driver/mysql"
  17. )
  18. var btyInss = []int{
  19. //markinfo.SCCNY,
  20. //markinfo.BTCCNY,
  21. //markinfo.BTYCNY,
  22. //markinfo.ETHCNY,
  23. //markinfo.ETCCNY,
  24. //markinfo.ZECCNY,
  25. //markinfo.BTSCNY,
  26. //markinfo.LTCCNY,
  27. //markinfo.BCCCNY,
  28. //markinfo.NYCCCNY,
  29. //markinfo.WTCCNY,
  30. markinfo.ETHBTC,
  31. markinfo.ETCBTC,
  32. markinfo.ZECBTC,
  33. markinfo.LTCBTC,
  34. markinfo.BCCBTC,
  35. markinfo.ETHUSDT,
  36. markinfo.ETCUSDT,
  37. markinfo.ZECUSDT,
  38. markinfo.LTCUSDT,
  39. markinfo.BCCUSDT,
  40. markinfo.BTCUSDT,
  41. markinfo.BTYUSDT,
  42. markinfo.BTSUSDT,
  43. markinfo.SCUSDT,
  44. markinfo.BTYBTC,
  45. markinfo.BTSBTC,
  46. markinfo.SCBTC,
  47. markinfo.YCCUSDT,
  48. markinfo.BTCSUSDT,
  49. markinfo.DCRUSDT,
  50. }
  51. var btyTable = "inss_lastoffsets"
  52. var last_offset int64
  53. var bty_reader *TickRead
  54. // BtyDS实现了dataSource接口, 并对bityuan的历史数据和实时数据保存
  55. type BtyDS struct {
  56. *DSBase
  57. conf *DsConf
  58. lastOffsets []int64
  59. }
  60. func init() {
  61. drivers[Bty] = newBtyDS
  62. }
  63. func newBtyDS(conf *DsConf) (DataSource, error) {
  64. err := checkBtyTable()
  65. if err != nil {
  66. return nil, err
  67. }
  68. bds := &BtyDS{
  69. DSBase: NewDsBase(conf),
  70. conf: conf,
  71. }
  72. bds.insMap = btyInsMap()
  73. bds.lastOffsets = make([]int64, len(btyInss))
  74. for i := 0; i < len(btyInss); i++ {
  75. symbol, _ := markinfo.SymbolName(btyInss[i])
  76. lastOffset, err := getLastOffset(Bty, symbol)
  77. if err != nil {
  78. return nil, err
  79. }
  80. bds.lastOffsets[i] = lastOffset
  81. }
  82. bty_reader = NewTickRead()
  83. return bds, nil
  84. }
  85. func (bds *BtyDS) Name() string {
  86. return Bty
  87. }
  88. func (bds *BtyDS) Run() {
  89. //log.Println("BtyDS.Run")
  90. for i, offset := range bds.lastOffsets {
  91. bds.getData(int64(btyInss[i]), offset)
  92. }
  93. for {
  94. mk, err := bty_reader.Read()
  95. if err != nil {
  96. fmt.Println(err)
  97. }
  98. //log.Println("save fuck")
  99. bds.Save(mk)
  100. }
  101. }
  102. func btyInsMap() map[int64]*Instrument {
  103. insMap := make(map[int64]*Instrument)
  104. for _, id := range btyInss {
  105. x, _ := markinfo.SymbolName(id)
  106. u, _ := markinfo.SymbolUint(x)
  107. ins := &Instrument{
  108. Id: int64(id),
  109. Name: x,
  110. ExId: Bty,
  111. Type: market.Btcs,
  112. PriceInc: u,
  113. StartTime: time.Now().Unix() * 1000,
  114. }
  115. insMap[int64(id)] = ins
  116. }
  117. return insMap
  118. }
  119. func (bds *BtyDS) getData(instrumentId, offset int64) {
  120. //var reader *TickRead
  121. if instrumentId == markinfo.BTYCNY {
  122. GetMacTick(instrumentId, offset)
  123. } else {
  124. GetEthTick(instrumentId, offset)
  125. }
  126. //if instrumentId == markinfo.BTCCNY {
  127. //GetBtcTick(instrumentId, offset)
  128. //}
  129. }
  130. func GetEthTickbyPage(instrumentId, offset int64, cb func(*Market)) (int64, error) {
  131. symbol, _ := markinfo.SymbolName(int(instrumentId))
  132. url := "http://47.75.62.253:46658/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  133. //url := "http://47.74.9.155:45656/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  134. //url := "http://121.196.205.182:45656/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  135. response, err := http.Get(url)
  136. if err != nil {
  137. //log.Println("f1", url, err)
  138. return offset, err
  139. }
  140. defer response.Body.Close()
  141. body, err := ioutil.ReadAll(response.Body)
  142. if err != nil {
  143. //log.Println("f2", url, err)
  144. return offset, err
  145. }
  146. var data map[string]interface{}
  147. err = json.Unmarshal(body, &data)
  148. if err != nil {
  149. //log.Println("f3", url, err)
  150. return offset, err
  151. }
  152. //fmt.Println(url)
  153. dataticks, ok := data["result"].([]interface{})
  154. if !ok {
  155. //log.Println("GetEthTickbyPage1", url, string(body))
  156. return offset, nil
  157. }
  158. total := len(dataticks)
  159. if total == 0 {
  160. //log.Println("GetEthTickbyPage2", url, string(body))
  161. }
  162. //offset += int64(total)
  163. for _, tmp := range dataticks {
  164. ti, id, err := toEthMK(tmp.(map[string]interface{}))
  165. if err != nil {
  166. fmt.Println("toMac=", err)
  167. continue
  168. }
  169. offset = id
  170. cb(ti)
  171. }
  172. return offset, nil
  173. }
  174. func GetBtcTickbyPage(offset int64, cb func(*Market)) (int64, error) {
  175. url := "http://32.33.cn:9902/tender/default/kline?symbol=BTC&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  176. response, err := http.Get(url)
  177. if err != nil {
  178. //log.Println("k1", url, err)
  179. return offset, err
  180. }
  181. defer response.Body.Close()
  182. body, err := ioutil.ReadAll(response.Body)
  183. if err != nil {
  184. //log.Println("k2", url, err)
  185. return offset, err
  186. }
  187. var data map[string]interface{}
  188. err = json.Unmarshal(body, &data)
  189. if err != nil {
  190. //log.Println("k3", url, err)
  191. return offset, err
  192. }
  193. //fmt.Println(url)
  194. dataticks, ok := data["result"].([]interface{})
  195. if !ok {
  196. //log.Println("GetBtcTickbyPage1", url, string(body))
  197. return offset, nil
  198. }
  199. total := len(dataticks)
  200. if total == 0 {
  201. //log.Println("GetBtcTickbyPage2", url, string(body))
  202. }
  203. offset += int64(total)
  204. for _, tmp := range dataticks {
  205. ti, err := toBtcMK(tmp.(map[string]interface{}))
  206. if err != nil {
  207. fmt.Println("toBtc=", err)
  208. continue
  209. }
  210. cb(ti)
  211. }
  212. return offset, nil
  213. }
  214. func toEthMK(data map[string]interface{}) (*Market, int64, error) {
  215. mk := &Market{}
  216. mk.Type = IntBty
  217. symbolId := int32(data["symbolid"].(float64))
  218. switch symbolId {
  219. case 327681:
  220. mk.InsId = markinfo.ETCCNY
  221. case 131073:
  222. mk.InsId = markinfo.BTCCNY
  223. case 262145:
  224. mk.InsId = markinfo.ETHCNY
  225. case 458753:
  226. mk.InsId = markinfo.SCCNY
  227. case 524289:
  228. mk.InsId = markinfo.ZECCNY
  229. case 589825:
  230. mk.InsId = markinfo.BTSCNY
  231. case 655361:
  232. mk.InsId = markinfo.LTCCNY
  233. case 720897:
  234. mk.InsId = markinfo.BCCCNY
  235. case 851969:
  236. mk.InsId = markinfo.NYCCCNY
  237. case 917505:
  238. mk.InsId = markinfo.WTCCNY
  239. case 262146:
  240. mk.InsId = markinfo.ETHBTC
  241. case 327682:
  242. mk.InsId = markinfo.ETCBTC
  243. case 524290:
  244. mk.InsId = markinfo.ZECBTC
  245. case 655362:
  246. mk.InsId = markinfo.LTCBTC
  247. case 720898:
  248. mk.InsId = markinfo.BCCBTC
  249. case 262159:
  250. mk.InsId = markinfo.ETHUSDT
  251. case 327695:
  252. mk.InsId = markinfo.ETCUSDT
  253. case 524303:
  254. mk.InsId = markinfo.ZECUSDT
  255. case 655375:
  256. mk.InsId = markinfo.LTCUSDT
  257. case 720911:
  258. mk.InsId = markinfo.BCCUSDT
  259. case 131087:
  260. mk.InsId = markinfo.BTCUSDT
  261. case 196623:
  262. mk.InsId = markinfo.BTYUSDT
  263. case 589839:
  264. mk.InsId = markinfo.BTSUSDT
  265. case 458767:
  266. mk.InsId = markinfo.SCUSDT
  267. case 196610:
  268. mk.InsId = markinfo.BTYBTC
  269. case 589826:
  270. mk.InsId = markinfo.BTSBTC
  271. case 458754:
  272. mk.InsId = markinfo.SCBTC
  273. case 786447:
  274. mk.InsId = markinfo.YCCUSDT
  275. case 1048591:
  276. mk.InsId = markinfo.BTCSUSDT
  277. case 1114127:
  278. mk.InsId = markinfo.DCRUSDT
  279. default:
  280. return nil, 0, errors.New("invalid symbolid")
  281. }
  282. mk.Timestamp = parseKTime(data["time"].(string))
  283. price := data["price"].(float64)
  284. volume, _ := data["quantity"].(float64)
  285. id := int64(data["id"].(float64))
  286. price /= (10000 * 10000)
  287. volume /= 100000000
  288. mk.Close = price
  289. mk.Open = price
  290. mk.High = price
  291. mk.Low = price
  292. mk.AllAmount = volume
  293. mk.AllVolume = volume
  294. mk.LastPrice = price
  295. mk.LastVolume = volume
  296. var ask, bid PP
  297. ask[0] = price
  298. ask[1] = volume
  299. bid[0] = price
  300. bid[1] = volume
  301. mk.Asks = append(mk.Asks, ask)
  302. mk.Bids = append(mk.Bids, bid)
  303. return mk, id, nil
  304. }
  305. func parseKTime(timeStr string) int64 {
  306. var year, month, day, hour, minute, second int
  307. fmt.Sscanf(timeStr, "%04d-%02d-%02d %02d:%02d:%02d", &year, &month, &day, &hour, &minute, &second)
  308. t := time.Date(year, time.Month(month), day, hour, minute, second, 0, time.Local)
  309. return t.Unix() * 1000
  310. }
  311. func toBtcMK(data map[string]interface{}) (*Market, error) {
  312. mk := &Market{}
  313. mk.Type = IntBty
  314. mk.InsId = markinfo.BTCCNY
  315. tick_time := data["time"]
  316. f_tick_time, _ := strconv.ParseFloat(tick_time.(string), 32)
  317. mk.Timestamp = int64(f_tick_time / (1e6))
  318. price := data["price"].(float64)
  319. volume := data["quantity"].(float64)
  320. price /= (100 * 10000)
  321. //volume /= 100
  322. mk.Close = price
  323. mk.Open = price
  324. mk.High = price
  325. mk.Low = price
  326. mk.AllAmount = volume
  327. mk.AllVolume = volume
  328. mk.LastPrice = price
  329. mk.LastVolume = volume
  330. var ask, bid PP
  331. ask[0] = price
  332. ask[1] = volume
  333. bid[0] = price
  334. bid[1] = volume
  335. mk.Asks = append(mk.Asks, ask)
  336. mk.Bids = append(mk.Bids, bid)
  337. return mk, nil
  338. }
  339. type TickRead struct {
  340. ch chan *Market
  341. err chan error
  342. }
  343. func NewTickRead() *TickRead {
  344. ch := make(chan *Market, 1024)
  345. errch := make(chan error)
  346. reader := &TickRead{}
  347. reader.ch = ch
  348. reader.err = errch
  349. return reader
  350. }
  351. func (tr *TickRead) Read() (*Market, error) {
  352. tick := <-tr.ch
  353. if tick == nil {
  354. return nil, <-tr.err
  355. }
  356. return tick, nil
  357. }
  358. func GetEthTick(instrumentId, offset int64) error {
  359. //reader := NewTickRead()
  360. lasttime := int64(0)
  361. go func() {
  362. for {
  363. offset_next, err := GetEthTickbyPage(instrumentId, offset, func(mk *Market) {
  364. if mk != nil {
  365. if mk.Timestamp >= lasttime {
  366. bty_reader.ch <- mk
  367. lasttime = mk.Timestamp
  368. }
  369. }
  370. })
  371. time.Sleep(time.Second)
  372. if err != nil {
  373. //log.Println("GetEthTick", err, offset)
  374. continue
  375. }
  376. if offset < offset_next {
  377. symbol, _ := markinfo.SymbolName(int(instrumentId))
  378. err := updateLastoffset(Bty, symbol, offset_next)
  379. if err != nil {
  380. continue
  381. }
  382. //time.Sleep(time.Duration((offset_next-offset)/100) * time.Second)
  383. offset = offset_next
  384. }
  385. //if !isEndPage {
  386. //continue
  387. //}
  388. }
  389. }()
  390. return nil
  391. }
  392. func GetBtcTick(instrumentId, offset int64) error {
  393. //reader := NewTickRead()
  394. lasttime := int64(0)
  395. go func() {
  396. for {
  397. //offset, _ = getLastOffset(Bty, "BTCCNY")
  398. offset_next, err := GetBtcTickbyPage(offset, func(mk *Market) {
  399. if mk != nil {
  400. if mk.Timestamp >= lasttime {
  401. bty_reader.ch <- mk
  402. lasttime = mk.Timestamp
  403. }
  404. }
  405. })
  406. time.Sleep(time.Second)
  407. if err != nil {
  408. //log.Println("GetBtcTick", err, offset)
  409. continue
  410. }
  411. if offset < offset_next {
  412. symbol, _ := markinfo.SymbolName(int(instrumentId))
  413. err := updateLastoffset(Bty, symbol, offset_next)
  414. if err != nil {
  415. continue
  416. }
  417. //time.Sleep(time.Duration((offset_next-offset)/100) * time.Second)
  418. offset = offset_next
  419. }
  420. //if !isEndPage {
  421. //continue
  422. //}
  423. }
  424. }()
  425. return nil
  426. }
  427. /*创建数据表*/
  428. func checkBtyTable() error {
  429. sql := fmt.Sprintf("create table if not exists %s (ty varchar(10), insId varchar(20), lastOffset bigint)", btyTable)
  430. _, err := db.Exec(sql)
  431. if err != nil {
  432. return err
  433. }
  434. return nil
  435. }
  436. func getLastOffset(ty, insId string) (lastOffset int64, err error) {
  437. szSelectTable := "SELECT `lastOffset` FROM `" + btyTable + "` WHERE `ty` = '" + ty + "' AND `insId` = '" + insId + "';"
  438. row := db.QueryRow(szSelectTable)
  439. err = row.Scan(&lastOffset)
  440. if err == sql.ErrNoRows {
  441. q := fmt.Sprintf("INSERT INTO %s (ty, insId, lastOffset) values ('%s', '%s', '%d')", btyTable, ty, insId, lastOffset)
  442. _, err = db.Exec(q)
  443. if err != nil {
  444. fmt.Println("getLastOffset", err)
  445. return lastOffset, err
  446. }
  447. return lastOffset, nil
  448. }
  449. return lastOffset, err
  450. }
  451. func updateLastoffset(typ, insId string, lastOffset int64) error {
  452. //INSERT INTO
  453. q := fmt.Sprintf("UPDATE %s set lastOffset = '%d' where ty = '%s' and insId = '%s'", btyTable, lastOffset, typ, insId)
  454. _, err := db.Exec(q)
  455. if err != nil {
  456. fmt.Println("updateLastoffset", err)
  457. }
  458. return err
  459. }
  460. func GetMacTick(instrumentId, ntime int64) error {
  461. //reader := NewTickRead()
  462. go func() {
  463. for {
  464. err, _ntime, _ := GetMacTickbyPage(1, ntime, instrumentId, func(mk *Market) {
  465. if mk != nil {
  466. bty_reader.ch <- mk
  467. }
  468. })
  469. time.Sleep(time.Second)
  470. //fmt.Println(err, ntime, isEndPage)
  471. if err != nil {
  472. //log.Println("GetMacTick", err)
  473. continue
  474. }
  475. if ntime < _ntime {
  476. symbol, _ := markinfo.SymbolName(int(instrumentId))
  477. err := updateLastoffset(Bty, symbol, _ntime)
  478. if err != nil {
  479. continue
  480. }
  481. //time.Sleep(time.Duration(total/100) * time.Second)
  482. ntime = _ntime
  483. }
  484. //if !isEndPage {
  485. //continue
  486. //}
  487. }
  488. }()
  489. return nil
  490. }
  491. func GetMacTickbyPage(page int, time2 int64, instrumentId int64, cb func(*Market)) (error, int64, int64) {
  492. url := "https://zc.bityuan.com/site/tick2?count=500&Tick_page=" + fmt.Sprintf("%d", page) + "&Tick_sort=match_id.asc&time=" + fmt.Sprintf("%d", time2)
  493. //fmt.Println(url)
  494. response, err := http.Get(url)
  495. if err != nil {
  496. //fmt.Println("macoin", err)
  497. return err, 0, 0
  498. }
  499. defer response.Body.Close()
  500. body, err := ioutil.ReadAll(response.Body)
  501. if err != nil {
  502. //fmt.Println(err)
  503. return err, 0, 0
  504. }
  505. var data map[string]interface{}
  506. err = json.Unmarshal(body, &data)
  507. if err != nil {
  508. //fmt.Println("json Unmarshal", err)
  509. return err, 0, 0
  510. }
  511. total, err := strconv.ParseInt(data["total"].(string), 10, 32)
  512. if err != nil {
  513. //fmt.Println("ParseInt total ", err)
  514. return err, 0, 0
  515. }
  516. dataticks := data["data"].([]interface{})
  517. ntime := int64(0)
  518. for _, tmp := range dataticks {
  519. mk, err := toMacMK(tmp.(map[string]interface{}), instrumentId)
  520. if mk == nil || err != nil {
  521. continue
  522. }
  523. ntime = (mk.Timestamp / 1000) + 1
  524. if mk.LastPrice == 0.0 || mk.LastVolume == 0.0 {
  525. continue
  526. }
  527. cb(mk)
  528. }
  529. return nil, ntime, total
  530. }
  531. func toMacMK(data map[string]interface{}, instrumentId int64) (*Market, error) {
  532. mk := &Market{}
  533. mk.InsId = instrumentId
  534. mk.Type = IntBty
  535. timestamp, err := strconv.ParseInt(data["time"].(string), 10, 32)
  536. if err != nil {
  537. return nil, err
  538. }
  539. match_id, err := strconv.ParseInt(data["match_id"].(string), 10, 32)
  540. if err != nil {
  541. return nil, err
  542. }
  543. if last_offset > int64(match_id) {
  544. return nil, nil
  545. }
  546. last_offset = int64(match_id)
  547. mk.Timestamp = timestamp * 1000
  548. // value
  549. var datavalue map[string]interface{}
  550. var datasell map[string]interface{}
  551. err = json.Unmarshal([]byte(data["value"].(string)), &datavalue)
  552. if err != nil {
  553. return nil, err
  554. }
  555. if len(datavalue["sell"].([]interface{})) == 0 {
  556. return mk, nil
  557. }
  558. datasell = (datavalue["sell"].([]interface{})[0]).(map[string]interface{})
  559. price, err := strconv.ParseFloat(datasell["price"].(string), 64)
  560. if err != nil {
  561. return nil, err
  562. }
  563. price /= 1000
  564. volume, err := strconv.ParseInt(data["amount"].(string), 10, 64)
  565. if err != nil {
  566. return nil, err
  567. }
  568. if volume < 0 {
  569. volume = -volume
  570. }
  571. mk.AllAmount = float64(volume)
  572. mk.AllVolume = float64(volume)
  573. mk.Close = price
  574. mk.Open = price
  575. mk.High = price
  576. mk.Low = price
  577. mk.LastPrice = price
  578. mk.LastVolume = float64(volume)
  579. var ask, bid PP
  580. ask[0] = price
  581. ask[1] = float64(volume)
  582. bid[0] = price
  583. bid[1] = float64(volume)
  584. mk.Asks = append(mk.Asks, ask)
  585. mk.Bids = append(mk.Bids, bid)
  586. return mk, nil
  587. }