ds_bty.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622
  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package tick
  3. // 本文件实现bityuan数据源接口, 实时数据和历史数据的获取和保存
  4. import (
  5. "database/sql"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io/ioutil"
  10. //"log"
  11. "net/http"
  12. "strconv"
  13. "time"
  14. "tickserver/markinfo"
  15. "tickserver/server/market"
  16. _ "github.com/go-sql-driver/mysql"
  17. )
  18. var btyInss = []int{
  19. //markinfo.SCCNY,
  20. //markinfo.BTCCNY,
  21. //markinfo.BTYCNY,
  22. //markinfo.ETHCNY,
  23. //markinfo.ETCCNY,
  24. //markinfo.ZECCNY,
  25. //markinfo.BTSCNY,
  26. //markinfo.LTCCNY,
  27. //markinfo.BCCCNY,
  28. //markinfo.NYCCCNY,
  29. //markinfo.WTCCNY,
  30. markinfo.ETHBTC,
  31. markinfo.ETCBTC,
  32. markinfo.ZECBTC,
  33. markinfo.LTCBTC,
  34. markinfo.BCCBTC,
  35. markinfo.ETHUSDT,
  36. markinfo.ETCUSDT,
  37. markinfo.ZECUSDT,
  38. markinfo.LTCUSDT,
  39. markinfo.BCCUSDT,
  40. markinfo.BTCUSDT,
  41. markinfo.BTYUSDT,
  42. markinfo.BTSUSDT,
  43. markinfo.SCUSDT,
  44. markinfo.BTYBTC,
  45. markinfo.BTSBTC,
  46. markinfo.SCBTC,
  47. markinfo.YCCUSDT,
  48. markinfo.BTCSUSDT,
  49. markinfo.DCRUSDT,
  50. }
  51. var btyTable = "inss_lastoffsets"
  52. var last_offset int64
  53. var bty_reader *TickRead
  54. // BtyDS实现了dataSource接口, 并对bityuan的历史数据和实时数据保存
  55. type BtyDS struct {
  56. *DSBase
  57. conf *DsConf
  58. lastOffsets []int64
  59. }
  60. func init() {
  61. drivers[Bty] = newBtyDS
  62. }
  63. func newBtyDS(conf *DsConf) (DataSource, error) {
  64. err := checkBtyTable()
  65. if err != nil {
  66. return nil, err
  67. }
  68. bds := &BtyDS{
  69. DSBase: NewDsBase(conf),
  70. conf: conf,
  71. }
  72. bds.insMap = btyInsMap()
  73. bds.lastOffsets = make([]int64, len(btyInss))
  74. for i := 0; i < len(btyInss); i++ {
  75. symbol, _ := markinfo.SymbolName(btyInss[i])
  76. lastOffset, err := getLastOffset(Bty, symbol)
  77. if err != nil {
  78. return nil, err
  79. }
  80. bds.lastOffsets[i] = lastOffset
  81. }
  82. bty_reader = NewTickRead()
  83. return bds, nil
  84. }
  85. func (bds *BtyDS) Name() string {
  86. return Bty
  87. }
  88. func (bds *BtyDS) Run() {
  89. //log.Println("BtyDS.Run")
  90. for i, offset := range bds.lastOffsets {
  91. bds.getData(int64(btyInss[i]), offset)
  92. }
  93. for {
  94. mk, err := bty_reader.Read()
  95. if err != nil {
  96. fmt.Println(err)
  97. }
  98. //log.Println("save fuck")
  99. bds.Save(mk)
  100. }
  101. }
  102. func btyInsMap() map[int64]*Instrument {
  103. insMap := make(map[int64]*Instrument)
  104. for _, id := range btyInss {
  105. x, _ := markinfo.SymbolName(id)
  106. u, _ := markinfo.SymbolUint(x)
  107. ins := &Instrument{
  108. Id: int64(id),
  109. Name: x,
  110. ExId: Bty,
  111. Type: market.Btcs,
  112. PriceInc: u,
  113. StartTime: time.Now().Unix() * 1000,
  114. }
  115. insMap[int64(id)] = ins
  116. }
  117. return insMap
  118. }
  119. func (bds *BtyDS) getData(instrumentId, offset int64) {
  120. //var reader *TickRead
  121. if instrumentId == markinfo.BTYCNY {
  122. GetMacTick(instrumentId, offset)
  123. } else {
  124. GetEthTick(instrumentId, offset)
  125. }
  126. //if instrumentId == markinfo.BTCCNY {
  127. //GetBtcTick(instrumentId, offset)
  128. //}
  129. }
  130. func GetEthTickbyPage(instrumentId, offset int64, cb func(*Market)) (int64, error) {
  131. symbol, _ := markinfo.SymbolName(int(instrumentId))
  132. url := "http://47.74.9.155:45656/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  133. //url := "http://121.196.205.182:45656/tender/default/kline?symbol=" + symbol + "&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  134. response, err := http.Get(url)
  135. if err != nil {
  136. //log.Println("f1", url, err)
  137. return offset, err
  138. }
  139. defer response.Body.Close()
  140. body, err := ioutil.ReadAll(response.Body)
  141. if err != nil {
  142. //log.Println("f2", url, err)
  143. return offset, err
  144. }
  145. var data map[string]interface{}
  146. err = json.Unmarshal(body, &data)
  147. if err != nil {
  148. //log.Println("f3", url, err)
  149. return offset, err
  150. }
  151. //fmt.Println(url)
  152. dataticks, ok := data["result"].([]interface{})
  153. if !ok {
  154. //log.Println("GetEthTickbyPage1", url, string(body))
  155. return offset, nil
  156. }
  157. total := len(dataticks)
  158. if total == 0 {
  159. //log.Println("GetEthTickbyPage2", url, string(body))
  160. }
  161. //offset += int64(total)
  162. for _, tmp := range dataticks {
  163. ti, id, err := toEthMK(tmp.(map[string]interface{}))
  164. if err != nil {
  165. fmt.Println("toMac=", err)
  166. continue
  167. }
  168. offset = id
  169. cb(ti)
  170. }
  171. return offset, nil
  172. }
  173. func GetBtcTickbyPage(offset int64, cb func(*Market)) (int64, error) {
  174. url := "http://32.33.cn:9902/tender/default/kline?symbol=BTC&offset=" + fmt.Sprintf("%d", offset) + "&limit=100"
  175. response, err := http.Get(url)
  176. if err != nil {
  177. //log.Println("k1", url, err)
  178. return offset, err
  179. }
  180. defer response.Body.Close()
  181. body, err := ioutil.ReadAll(response.Body)
  182. if err != nil {
  183. //log.Println("k2", url, err)
  184. return offset, err
  185. }
  186. var data map[string]interface{}
  187. err = json.Unmarshal(body, &data)
  188. if err != nil {
  189. //log.Println("k3", url, err)
  190. return offset, err
  191. }
  192. //fmt.Println(url)
  193. dataticks, ok := data["result"].([]interface{})
  194. if !ok {
  195. //log.Println("GetBtcTickbyPage1", url, string(body))
  196. return offset, nil
  197. }
  198. total := len(dataticks)
  199. if total == 0 {
  200. //log.Println("GetBtcTickbyPage2", url, string(body))
  201. }
  202. offset += int64(total)
  203. for _, tmp := range dataticks {
  204. ti, err := toBtcMK(tmp.(map[string]interface{}))
  205. if err != nil {
  206. fmt.Println("toBtc=", err)
  207. continue
  208. }
  209. cb(ti)
  210. }
  211. return offset, nil
  212. }
  213. func toEthMK(data map[string]interface{}) (*Market, int64, error) {
  214. mk := &Market{}
  215. mk.Type = IntBty
  216. symbolId := int32(data["symbolid"].(float64))
  217. switch symbolId {
  218. case 327681:
  219. mk.InsId = markinfo.ETCCNY
  220. case 131073:
  221. mk.InsId = markinfo.BTCCNY
  222. case 262145:
  223. mk.InsId = markinfo.ETHCNY
  224. case 458753:
  225. mk.InsId = markinfo.SCCNY
  226. case 524289:
  227. mk.InsId = markinfo.ZECCNY
  228. case 589825:
  229. mk.InsId = markinfo.BTSCNY
  230. case 655361:
  231. mk.InsId = markinfo.LTCCNY
  232. case 720897:
  233. mk.InsId = markinfo.BCCCNY
  234. case 851969:
  235. mk.InsId = markinfo.NYCCCNY
  236. case 917505:
  237. mk.InsId = markinfo.WTCCNY
  238. case 262146:
  239. mk.InsId = markinfo.ETHBTC
  240. case 327682:
  241. mk.InsId = markinfo.ETCBTC
  242. case 524290:
  243. mk.InsId = markinfo.ZECBTC
  244. case 655362:
  245. mk.InsId = markinfo.LTCBTC
  246. case 720898:
  247. mk.InsId = markinfo.BCCBTC
  248. case 262159:
  249. mk.InsId = markinfo.ETHUSDT
  250. case 327695:
  251. mk.InsId = markinfo.ETCUSDT
  252. case 524303:
  253. mk.InsId = markinfo.ZECUSDT
  254. case 655375:
  255. mk.InsId = markinfo.LTCUSDT
  256. case 720911:
  257. mk.InsId = markinfo.BCCUSDT
  258. case 131087:
  259. mk.InsId = markinfo.BTCUSDT
  260. case 196623:
  261. mk.InsId = markinfo.BTYUSDT
  262. case 589839:
  263. mk.InsId = markinfo.BTSUSDT
  264. case 458767:
  265. mk.InsId = markinfo.SCUSDT
  266. case 196610:
  267. mk.InsId = markinfo.BTYBTC
  268. case 589826:
  269. mk.InsId = markinfo.BTSBTC
  270. case 458754:
  271. mk.InsId = markinfo.SCBTC
  272. case 786447:
  273. mk.InsId = markinfo.YCCUSDT
  274. case 1048591:
  275. mk.InsId = markinfo.BTCSUSDT
  276. case 1114127:
  277. mk.InsId = markinfo.DCRUSDT
  278. default:
  279. return nil, 0, errors.New("invalid symbolid")
  280. }
  281. mk.Timestamp = parseKTime(data["time"].(string))
  282. price := data["price"].(float64)
  283. volume, _ := data["quantity"].(float64)
  284. id := int64(data["id"].(float64))
  285. price /= (10000 * 10000)
  286. volume /= 100000000
  287. mk.Close = price
  288. mk.Open = price
  289. mk.High = price
  290. mk.Low = price
  291. mk.AllAmount = volume
  292. mk.AllVolume = volume
  293. mk.LastPrice = price
  294. mk.LastVolume = volume
  295. var ask, bid PP
  296. ask[0] = price
  297. ask[1] = volume
  298. bid[0] = price
  299. bid[1] = volume
  300. mk.Asks = append(mk.Asks, ask)
  301. mk.Bids = append(mk.Bids, bid)
  302. return mk, id, nil
  303. }
  304. func parseKTime(timeStr string) int64 {
  305. var year, month, day, hour, minute, second int
  306. fmt.Sscanf(timeStr, "%04d-%02d-%02d %02d:%02d:%02d", &year, &month, &day, &hour, &minute, &second)
  307. t := time.Date(year, time.Month(month), day, hour, minute, second, 0, time.Local)
  308. return t.Unix() * 1000
  309. }
  310. func toBtcMK(data map[string]interface{}) (*Market, error) {
  311. mk := &Market{}
  312. mk.Type = IntBty
  313. mk.InsId = markinfo.BTCCNY
  314. tick_time := data["time"]
  315. f_tick_time, _ := strconv.ParseFloat(tick_time.(string), 32)
  316. mk.Timestamp = int64(f_tick_time / (1e6))
  317. price := data["price"].(float64)
  318. volume := data["quantity"].(float64)
  319. price /= (100 * 10000)
  320. //volume /= 100
  321. mk.Close = price
  322. mk.Open = price
  323. mk.High = price
  324. mk.Low = price
  325. mk.AllAmount = volume
  326. mk.AllVolume = volume
  327. mk.LastPrice = price
  328. mk.LastVolume = volume
  329. var ask, bid PP
  330. ask[0] = price
  331. ask[1] = volume
  332. bid[0] = price
  333. bid[1] = volume
  334. mk.Asks = append(mk.Asks, ask)
  335. mk.Bids = append(mk.Bids, bid)
  336. return mk, nil
  337. }
  338. type TickRead struct {
  339. ch chan *Market
  340. err chan error
  341. }
  342. func NewTickRead() *TickRead {
  343. ch := make(chan *Market, 1024)
  344. errch := make(chan error)
  345. reader := &TickRead{}
  346. reader.ch = ch
  347. reader.err = errch
  348. return reader
  349. }
  350. func (tr *TickRead) Read() (*Market, error) {
  351. tick := <-tr.ch
  352. if tick == nil {
  353. return nil, <-tr.err
  354. }
  355. return tick, nil
  356. }
  357. func GetEthTick(instrumentId, offset int64) error {
  358. //reader := NewTickRead()
  359. lasttime := int64(0)
  360. go func() {
  361. for {
  362. offset_next, err := GetEthTickbyPage(instrumentId, offset, func(mk *Market) {
  363. if mk != nil {
  364. if mk.Timestamp >= lasttime {
  365. bty_reader.ch <- mk
  366. lasttime = mk.Timestamp
  367. }
  368. }
  369. })
  370. time.Sleep(time.Second)
  371. if err != nil {
  372. //log.Println("GetEthTick", err, offset)
  373. continue
  374. }
  375. if offset < offset_next {
  376. symbol, _ := markinfo.SymbolName(int(instrumentId))
  377. err := updateLastoffset(Bty, symbol, offset_next)
  378. if err != nil {
  379. continue
  380. }
  381. //time.Sleep(time.Duration((offset_next-offset)/100) * time.Second)
  382. offset = offset_next
  383. }
  384. //if !isEndPage {
  385. //continue
  386. //}
  387. }
  388. }()
  389. return nil
  390. }
  391. func GetBtcTick(instrumentId, offset int64) error {
  392. //reader := NewTickRead()
  393. lasttime := int64(0)
  394. go func() {
  395. for {
  396. //offset, _ = getLastOffset(Bty, "BTCCNY")
  397. offset_next, err := GetBtcTickbyPage(offset, func(mk *Market) {
  398. if mk != nil {
  399. if mk.Timestamp >= lasttime {
  400. bty_reader.ch <- mk
  401. lasttime = mk.Timestamp
  402. }
  403. }
  404. })
  405. time.Sleep(time.Second)
  406. if err != nil {
  407. //log.Println("GetBtcTick", err, offset)
  408. continue
  409. }
  410. if offset < offset_next {
  411. symbol, _ := markinfo.SymbolName(int(instrumentId))
  412. err := updateLastoffset(Bty, symbol, offset_next)
  413. if err != nil {
  414. continue
  415. }
  416. //time.Sleep(time.Duration((offset_next-offset)/100) * time.Second)
  417. offset = offset_next
  418. }
  419. //if !isEndPage {
  420. //continue
  421. //}
  422. }
  423. }()
  424. return nil
  425. }
  426. /*创建数据表*/
  427. func checkBtyTable() error {
  428. sql := fmt.Sprintf("create table if not exists %s (ty varchar(10), insId varchar(20), lastOffset bigint)", btyTable)
  429. _, err := db.Exec(sql)
  430. if err != nil {
  431. return err
  432. }
  433. return nil
  434. }
  435. func getLastOffset(ty, insId string) (lastOffset int64, err error) {
  436. szSelectTable := "SELECT `lastOffset` FROM `" + btyTable + "` WHERE `ty` = '" + ty + "' AND `insId` = '" + insId + "';"
  437. row := db.QueryRow(szSelectTable)
  438. err = row.Scan(&lastOffset)
  439. if err == sql.ErrNoRows {
  440. q := fmt.Sprintf("INSERT INTO %s (ty, insId, lastOffset) values ('%s', '%s', '%d')", btyTable, ty, insId, lastOffset)
  441. _, err = db.Exec(q)
  442. if err != nil {
  443. fmt.Println("getLastOffset", err)
  444. return lastOffset, err
  445. }
  446. return lastOffset, nil
  447. }
  448. return lastOffset, err
  449. }
  450. func updateLastoffset(typ, insId string, lastOffset int64) error {
  451. //INSERT INTO
  452. q := fmt.Sprintf("UPDATE %s set lastOffset = '%d' where ty = '%s' and insId = '%s'", btyTable, lastOffset, typ, insId)
  453. _, err := db.Exec(q)
  454. if err != nil {
  455. fmt.Println("updateLastoffset", err)
  456. }
  457. return err
  458. }
  459. func GetMacTick(instrumentId, ntime int64) error {
  460. //reader := NewTickRead()
  461. go func() {
  462. for {
  463. err, _ntime, _ := GetMacTickbyPage(1, ntime, instrumentId, func(mk *Market) {
  464. if mk != nil {
  465. bty_reader.ch <- mk
  466. }
  467. })
  468. time.Sleep(time.Second)
  469. //fmt.Println(err, ntime, isEndPage)
  470. if err != nil {
  471. //log.Println("GetMacTick", err)
  472. continue
  473. }
  474. if ntime < _ntime {
  475. symbol, _ := markinfo.SymbolName(int(instrumentId))
  476. err := updateLastoffset(Bty, symbol, _ntime)
  477. if err != nil {
  478. continue
  479. }
  480. //time.Sleep(time.Duration(total/100) * time.Second)
  481. ntime = _ntime
  482. }
  483. //if !isEndPage {
  484. //continue
  485. //}
  486. }
  487. }()
  488. return nil
  489. }
  490. func GetMacTickbyPage(page int, time2 int64, instrumentId int64, cb func(*Market)) (error, int64, int64) {
  491. url := "https://zc.bityuan.com/site/tick2?count=500&Tick_page=" + fmt.Sprintf("%d", page) + "&Tick_sort=match_id.asc&time=" + fmt.Sprintf("%d", time2)
  492. //fmt.Println(url)
  493. response, err := http.Get(url)
  494. if err != nil {
  495. //fmt.Println("macoin", err)
  496. return err, 0, 0
  497. }
  498. defer response.Body.Close()
  499. body, err := ioutil.ReadAll(response.Body)
  500. if err != nil {
  501. //fmt.Println(err)
  502. return err, 0, 0
  503. }
  504. var data map[string]interface{}
  505. err = json.Unmarshal(body, &data)
  506. if err != nil {
  507. //fmt.Println("json Unmarshal", err)
  508. return err, 0, 0
  509. }
  510. total, err := strconv.ParseInt(data["total"].(string), 10, 32)
  511. if err != nil {
  512. //fmt.Println("ParseInt total ", err)
  513. return err, 0, 0
  514. }
  515. dataticks := data["data"].([]interface{})
  516. ntime := int64(0)
  517. for _, tmp := range dataticks {
  518. mk, err := toMacMK(tmp.(map[string]interface{}), instrumentId)
  519. if mk == nil || err != nil {
  520. continue
  521. }
  522. ntime = (mk.Timestamp / 1000) + 1
  523. if mk.LastPrice == 0.0 || mk.LastVolume == 0.0 {
  524. continue
  525. }
  526. cb(mk)
  527. }
  528. return nil, ntime, total
  529. }
  530. func toMacMK(data map[string]interface{}, instrumentId int64) (*Market, error) {
  531. mk := &Market{}
  532. mk.InsId = instrumentId
  533. mk.Type = IntBty
  534. timestamp, err := strconv.ParseInt(data["time"].(string), 10, 32)
  535. if err != nil {
  536. return nil, err
  537. }
  538. match_id, err := strconv.ParseInt(data["match_id"].(string), 10, 32)
  539. if err != nil {
  540. return nil, err
  541. }
  542. if last_offset > int64(match_id) {
  543. return nil, nil
  544. }
  545. last_offset = int64(match_id)
  546. mk.Timestamp = timestamp * 1000
  547. // value
  548. var datavalue map[string]interface{}
  549. var datasell map[string]interface{}
  550. err = json.Unmarshal([]byte(data["value"].(string)), &datavalue)
  551. if err != nil {
  552. return nil, err
  553. }
  554. if len(datavalue["sell"].([]interface{})) == 0 {
  555. return mk, nil
  556. }
  557. datasell = (datavalue["sell"].([]interface{})[0]).(map[string]interface{})
  558. price, err := strconv.ParseFloat(datasell["price"].(string), 64)
  559. if err != nil {
  560. return nil, err
  561. }
  562. price /= 1000
  563. volume, err := strconv.ParseInt(data["amount"].(string), 10, 64)
  564. if err != nil {
  565. return nil, err
  566. }
  567. if volume < 0 {
  568. volume = -volume
  569. }
  570. mk.AllAmount = float64(volume)
  571. mk.AllVolume = float64(volume)
  572. mk.Close = price
  573. mk.Open = price
  574. mk.High = price
  575. mk.Low = price
  576. mk.LastPrice = price
  577. mk.LastVolume = float64(volume)
  578. var ask, bid PP
  579. ask[0] = price
  580. ask[1] = float64(volume)
  581. bid[0] = price
  582. bid[1] = float64(volume)
  583. mk.Asks = append(mk.Asks, ask)
  584. mk.Bids = append(mk.Bids, bid)
  585. return mk, nil
  586. }