ds_tdx.go 23 KB


  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package tick
  3. /*
  4. #include <string.h>
  5. */
  6. import "C"
  7. // 本文件实现大智慧数据源接口, 实时数据和历史数据的获取和保存
  8. import (
  9. "bytes"
  10. "compress/zlib"
  11. "container/list"
  12. "encoding/binary"
  13. "errors"
  14. "fmt"
  15. "io"
  16. "log"
  17. "net"
  18. "os"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "time"
  23. "unsafe"
  24. "tickserver/server/market"
  25. "golang.org/x/text/encoding/simplifiedchinese"
  26. )
  27. const (
  28. STOCK_PER_SERVER = 50
  29. FETCH_PER_MILLISECOND = 100
  30. )
  31. type RecvDataHeader struct {
  32. CheckSum int32
  33. EncodeMode byte
  34. Tmp [5]byte
  35. Msgid int16
  36. Size int16
  37. DePackSize int16
  38. }
  39. // 公司资料原始数据
  40. type TdxStockInfo struct { // 初始化数据 29字节
  41. Code [6]byte //代码
  42. Rate int16 // 实时盘口中的成交量除去的除数?1手=n股?
  43. Name [8]byte //名称
  44. W1 int16 //w1 为5日平均量(用于量比计算)
  45. W2 int16
  46. PriceMag byte //小数点位数
  47. YClose float32 //昨收
  48. W3 int16
  49. W4 int16
  50. }
  51. //权息
  52. type QuanInfo struct {
  53. style byte
  54. day int32
  55. q1 float32
  56. q2 float32
  57. q3 float32
  58. q4 float32
  59. //style=1 (除权除息) (送现金,配股价,送股数,配股比例);
  60. //style=2 (送配股上市)
  61. //style=9 (转配股上市) (股本变化) Q1=前流通盘 Q2=前总股本 Q3=后流通盘 Q4=后总股本
  62. //style=3 (非流通股上市) 前流通盘 前总股本 后流通盘 后总股本
  63. // 3 送现金:3499.00 配股价:17468.00 送股数:4368.00 配股比例:17468.00
  64. // 权息日 类别 送转股 分红 配股 配股价 前流通盘 后流通盘 前总股本 后总股本
  65. //20120618 非流通股上市 3499.0 4368.0 17468.0 17468.0
  66. //style=5 (股本变化)前流通盘 前总股本 后流通盘 后总股本
  67. // 权息日 类别 送转股 分红 配股 配股价 前流通盘 后流通盘 前总股本 后总股本
  68. //20120316 股本变化 0.0 3499.0 0.0 17468.0
  69. //0 002663 Date:20120316 5 送现金: 0.00 配股价: 0.00 送股数:3499.00 配股比例:17468.00
  70. }
  71. type CaiWu struct {
  72. Mark byte
  73. code [6]byte
  74. LTG float32 //流通股数量
  75. t1 int16
  76. t2 int16
  77. day1 int32
  78. day2 int32
  79. zl [30]float32
  80. }
  81. type Stock struct {
  82. no int32 //no=mark*1000000+code;
  83. szOrsh byte
  84. quanlen int //权息长度
  85. gp TdxStockInfo
  86. quan [80]QuanInfo
  87. cw CaiWu
  88. }
  89. // TdxDS实现了dataSource接口, 并对tdx的历史数据和实时数据保存
  90. type TdxDS struct {
  91. *DSBase
  92. conf *DsConf
  93. datetime uint32
  94. stocks map[string]*Stock
  95. servers []string
  96. serverlist *list.List
  97. symbolsGroup [][STOCK_PER_SERVER]string
  98. exsGroup [][STOCK_PER_SERVER]byte
  99. conn net.Conn
  100. instrumentUpdated bool
  101. goroutineNum int
  102. statusCh chan int
  103. mu sync.Mutex
  104. }
  105. func init() {
  106. drivers[Tdx] = newTdxDS
  107. }
  108. func newTdxDS(conf *DsConf) (DataSource, error) {
  109. tds := &TdxDS{
  110. DSBase: NewDsBase(conf),
  111. conf: conf,
  112. stocks: make(map[string]*Stock),
  113. serverlist: list.New(),
  114. instrumentUpdated: false,
  115. statusCh: make(chan int, 1),
  116. }
  117. var err error
  118. tds.servers, err = loadServers(conf.CfgFile)
  119. if err != nil {
  120. return nil, err
  121. }
  122. for _, v := range tds.servers {
  123. tds.serverlist.PushBack(v)
  124. }
  125. err = tds.getConn()
  126. if err != nil {
  127. return nil, err
  128. }
  129. err = tds.getInstrument(0)
  130. if err != nil {
  131. return nil, err
  132. }
  133. err = tds.getInstrument(1)
  134. if err != nil {
  135. return nil, err
  136. }
  137. tds.conn.Close()
  138. i := 0
  139. var symbols [STOCK_PER_SERVER]string
  140. var exs [STOCK_PER_SERVER]byte
  141. for k, v := range tds.stocks {
  142. symbols[i] = k
  143. exs[i] = v.szOrsh
  144. i++
  145. if i >= STOCK_PER_SERVER {
  146. i = 0
  147. tds.symbolsGroup = append(tds.symbolsGroup, symbols)
  148. tds.exsGroup = append(tds.exsGroup, exs)
  149. }
  150. }
  151. if i > 0 {
  152. for j := i; j < STOCK_PER_SERVER; j++ {
  153. symbols[j] = ""
  154. }
  155. tds.symbolsGroup = append(tds.symbolsGroup, symbols)
  156. tds.exsGroup = append(tds.exsGroup, exs)
  157. }
  158. return tds, nil
  159. }
  160. func (tds *TdxDS) Name() string {
  161. return Tdx
  162. }
  163. func (tds *TdxDS) Run() {
  164. log.Println("TdxDS.Run")
  165. //tds.stocks["000001"] = &Stock{}
  166. //tds.readTick(tds.conn, 0, "000001")
  167. //var symbols = [STOCK_PER_SERVER]string{"000716", "002216", "002329", "002481", "002495", "002507", "002570", "002626", "002650", "002661", "002719", "002732", "300146", "300149", "300381", "300401", "600073", "600186", "600298", "600305", "600419", "600429", "600597", "600866", "600872", "600873", "600887", "603020", "603288", "000019", "000557", "000568", "000596", "000729", "000752", "000799", "000848", "000858", "000869", "000929", "000995", "002304", "002387", "002461", "002646", "600059", "600084", "600090", "600132", "600197"}
  168. //var exs = [STOCK_PER_SERVER]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1}
  169. //tds.readTicks(symbols, exs)
  170. go tds.updateInstruments()
  171. tds.fetchMarket()
  172. }
  173. func (tds *TdxDS) fetchMarket() {
  174. for i, v := range tds.symbolsGroup {
  175. go tds.readTicks(v, tds.exsGroup[i])
  176. tds.goroutineNum++
  177. }
  178. }
  179. func (tds *TdxDS) updateInstruments() {
  180. var err error
  181. ticker := time.Tick(time.Second * 30)
  182. for t := range ticker {
  183. if t.Hour() == 7 && t.Minute() == 0 { // 7:00重新连接服务器和获得股票信息
  184. tds.getConn()
  185. err = tds.getInstrument(0)
  186. if err != nil {
  187. log.Println(err)
  188. }
  189. err = tds.getInstrument(1)
  190. if err != nil {
  191. log.Println(err)
  192. }
  193. tds.conn.Close()
  194. i := 0
  195. tds.symbolsGroup = nil
  196. tds.exsGroup = nil
  197. var symbols [STOCK_PER_SERVER]string
  198. var exs [STOCK_PER_SERVER]byte
  199. for k, v := range tds.stocks {
  200. symbols[i] = k
  201. exs[i] = v.szOrsh
  202. i++
  203. if i >= STOCK_PER_SERVER {
  204. i = 0
  205. tds.symbolsGroup = append(tds.symbolsGroup, symbols)
  206. tds.exsGroup = append(tds.exsGroup, exs)
  207. }
  208. }
  209. if i > 0 {
  210. for j := i; j < STOCK_PER_SERVER; j++ {
  211. symbols[j] = ""
  212. }
  213. tds.symbolsGroup = append(tds.symbolsGroup, symbols)
  214. tds.exsGroup = append(tds.exsGroup, exs)
  215. }
  216. log.Println("updateInstruments begin")
  217. tds.instrumentUpdated = true
  218. for m := 0; m < tds.goroutineNum; m++ { //等待readTicks结束
  219. <-tds.statusCh
  220. }
  221. log.Println("updateInstruments end")
  222. tds.goroutineNum = 0
  223. tds.instrumentUpdated = false
  224. tds.fetchMarket()
  225. }
  226. }
  227. }
  228. func (tds *TdxDS) getTickConn() net.Conn {
  229. servernum := len(tds.servers)
  230. count := 0
  231. for {
  232. tds.mu.Lock()
  233. e := tds.serverlist.Front()
  234. server := tds.serverlist.Remove(e)
  235. tds.serverlist.PushBack(server)
  236. tds.mu.Unlock()
  237. count++
  238. if count > servernum {
  239. return nil
  240. }
  241. conn, err := net.DialTimeout("tcp", server.(string), 5*time.Second)
  242. if err != nil {
  243. log.Println(server.(string), err)
  244. continue
  245. }
  246. conn.(*net.TCPConn).SetDeadline(time.Now().Add(5 * time.Second))
  247. _, err = conn.Write([]byte("\x0C\x02\x18\x93\x00\x01\x03\x00\x03\x00\x0D\x00\x01"))
  248. if err != nil {
  249. log.Println(server.(string), err)
  250. continue
  251. }
  252. _, err = readBuf(conn)
  253. if err != nil {
  254. log.Println(server.(string), err)
  255. continue
  256. }
  257. //tds.datetime = binary.LittleEndian.Uint32(debuf[42:46])
  258. //log.Println("最后交易日:", tds.datetime)
  259. //log.Println("服务器名称:", decodeString(debuf[68:]))
  260. return conn
  261. }
  262. return nil
  263. }
  264. func (tds *TdxDS) getConn() error {
  265. for _, v := range tds.servers {
  266. conn, err := net.DialTimeout("tcp", v, 5*time.Second)
  267. if err != nil {
  268. log.Println(v, err)
  269. continue
  270. }
  271. conn.(*net.TCPConn).SetDeadline(time.Now().Add(5 * time.Second))
  272. _, err = conn.Write([]byte("\x0C\x02\x18\x93\x00\x01\x03\x00\x03\x00\x0D\x00\x01"))
  273. if err != nil {
  274. log.Println(v, err)
  275. continue
  276. }
  277. debuf, err := readBuf(conn)
  278. if err != nil {
  279. log.Println(v, err)
  280. continue
  281. }
  282. tds.datetime = binary.LittleEndian.Uint32(debuf[42:46])
  283. log.Println("最后交易日:", tds.datetime)
  284. log.Println("服务器名称:", decodeString(debuf[68:]))
  285. tds.conn = conn
  286. return nil
  287. }
  288. return errors.New("no conn available")
  289. }
  290. func decodeString(debuf []byte) string {
  291. var name []byte
  292. for i := 0; i < len(debuf); i++ {
  293. if int(debuf[i]) == 0 {
  294. name = debuf[0:i]
  295. break
  296. }
  297. }
  298. trans := simplifiedchinese.GBK.NewDecoder()
  299. dst := make([]byte, 1024)
  300. nDst, _, err := trans.Transform(dst, name, true)
  301. if err != nil {
  302. panic(err)
  303. }
  304. return string(dst[0:nDst])
  305. }
  306. func (tds *TdxDS) getInstrument(szOrsh byte) error {
  307. log.Println("getInstrument", szOrsh)
  308. bb := []byte("\x0C\x0C\x18\x6C\x00\x01\x08\x00\x08\x00\x4E\x04\xFF\x00\x01\x02\x03\x04") //取得股票数量
  309. bb[12] = szOrsh //0深圳 1上海
  310. wlen := len(bb)
  311. buf := bytes.NewBuffer(bb[14:])
  312. buf.Reset()
  313. binary.Write(buf, binary.LittleEndian, &tds.datetime)
  314. wlen, err := tds.conn.Write(bb[:wlen])
  315. if err != nil {
  316. return err
  317. }
  318. debuf, err := readBuf(tds.conn)
  319. if err != nil {
  320. return err
  321. }
  322. szcount := binary.LittleEndian.Uint16(debuf[:])
  323. //log.Println(wlen, szcount)
  324. trans := simplifiedchinese.GBK.NewDecoder()
  325. dst := make([]byte, 1024)
  326. var count uint16
  327. for count < szcount {
  328. bb11 := []byte("\x0C\x01\x18\x64\x01\x01\x06\x00\x06\x00\x50\x04\xFF\x00\xF2\xF3")
  329. bb11[12] = szOrsh
  330. wlen := len(bb11)
  331. buf := bytes.NewBuffer(bb11[14:])
  332. buf.Reset()
  333. binary.Write(buf, binary.LittleEndian, &count)
  334. wlen, err := tds.conn.Write(bb11[:wlen])
  335. if err != nil {
  336. continue
  337. }
  338. debuf, err := readBuf(tds.conn)
  339. if err != nil {
  340. return err
  341. }
  342. n := binary.LittleEndian.Uint16(debuf[:])
  343. stockInfoSize := int(unsafe.Sizeof(TdxStockInfo{}))
  344. stockInfoSize = 29
  345. var stock Stock
  346. for j := 0; j < int(n); j++ {
  347. buf := bytes.NewBuffer(debuf[2+j*stockInfoSize:])
  348. binary.Read(buf, binary.LittleEndian, &stock.gp)
  349. codeStr := string(stock.gp.Code[:])
  350. no, _ := strconv.Atoi(codeStr)
  351. stock.no = int32(no)
  352. stock.szOrsh = szOrsh
  353. _, ok := tds.stocks[codeStr]
  354. if !ok {
  355. tds.stocks[codeStr] = &stock
  356. ins, ok1 := tds.insMap[int64(stock.no)]
  357. if !ok1 {
  358. exid := SHEX
  359. if szOrsh == 0 {
  360. exid = SZEX
  361. }
  362. priceInc := float64(1.0)
  363. for i := 0; i < int(stock.gp.PriceMag); i++ {
  364. priceInc /= 10
  365. }
  366. nDst, _, _ := trans.Transform(dst, stock.gp.Name[:], true)
  367. ins = &Instrument{
  368. Id: int64(stock.no),
  369. Name: string(dst[0:nDst]),
  370. Type: market.Securities,
  371. ExId: exid,
  372. PriceInc: priceInc,
  373. }
  374. tds.insMap[ins.Id] = ins
  375. //log.Println(ins)
  376. }
  377. }
  378. count++
  379. }
  380. }
  381. return nil
  382. }
  383. func (tds *TdxDS) readTicks(symbols [STOCK_PER_SERVER]string, exs [STOCK_PER_SERVER]byte) {
  384. conn := tds.getTickConn()
  385. if conn == nil {
  386. log.Println("no conn available")
  387. return
  388. }
  389. defer conn.Close()
  390. lastDataMap := make(map[string]string)
  391. for {
  392. if tds.instrumentUpdated {
  393. tds.statusCh <- 1
  394. return
  395. }
  396. if !inTime() {
  397. time.Sleep(time.Second * 1)
  398. continue
  399. }
  400. //start := time.Now().UnixNano()
  401. var bb [800]byte
  402. bb2 := []byte("\x0C\x01\x20\x63\x00\x02\x13\x00\x13\x00\x3E\x05\x05\x00\x00\x00\x00\x00\x00\x00\x01\x00")
  403. C.memcpy(unsafe.Pointer(&bb[0]), unsafe.Pointer(&bb2[0]), C.size_t(len(bb2)))
  404. i := len(bb2)
  405. for index := 0; index < STOCK_PER_SERVER; index++ {
  406. if "" == symbols[index] {
  407. continue
  408. }
  409. bb[i] = exs[index]
  410. i++
  411. tmpsymbol := []byte(symbols[index])
  412. C.memcpy(unsafe.Pointer(&bb[i]), unsafe.Pointer(&tmpsymbol[0]), 6)
  413. i += 6
  414. }
  415. bb[20] = byte((i - 22) / 7) //数量
  416. len := uint16(i) - 10
  417. binary.LittleEndian.PutUint16(bb[6:], len)
  418. binary.LittleEndian.PutUint16(bb[8:], len)
  419. _, err := conn.Write(bb[:i])
  420. if err != nil {
  421. log.Println("readTicks.Write", err)
  422. conn.Close()
  423. conn = nil
  424. for {
  425. conn = tds.getTickConn()
  426. if conn != nil {
  427. break
  428. } else {
  429. time.Sleep(5 * time.Second)
  430. }
  431. }
  432. continue
  433. }
  434. debuf, err := readBuf(conn)
  435. if err != nil {
  436. log.Println("readTicks.Read", err)
  437. conn.Close()
  438. conn = nil
  439. for {
  440. conn = tds.getTickConn()
  441. if conn != nil {
  442. break
  443. } else {
  444. time.Sleep(5 * time.Second)
  445. }
  446. }
  447. continue
  448. }
  449. n := binary.LittleEndian.Uint16(debuf[2:])
  450. if n < 1 {
  451. log.Println("readTicks.n no data fetched")
  452. continue
  453. }
  454. buf := debuf[4:]
  455. i = 0
  456. var mks []*Market
  457. for j := 0; j < int(n); j++ {
  458. //m := buf[i]
  459. var code [8]byte
  460. C.memcpy(unsafe.Pointer(&code[0]), unsafe.Pointer(&buf[i+1]), 6)
  461. symbol := string(code[:6])
  462. mk := &Market{}
  463. mk.Type = IntTdx
  464. mk.InsId, _ = strconv.ParseInt(string(code[:6]), 10, 64)
  465. dd := float64(100.0)
  466. i += 9
  467. startPos := i
  468. mk.Close = float64(TDXDecode(buf, i, &i)) / dd
  469. mk.LastPrice = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  470. mk.Open = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  471. mk.High = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  472. mk.Low = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  473. TDXDecode(buf, i, &i) //Time := TDXDecode(buf, i, &i)
  474. mk.Timestamp = time.Now().Unix() * 1000 //tds.ParseTime(Time)
  475. TDXDecode(buf, i, &i)
  476. mk.LastVolume = float64(TDXDecode(buf, i, &i))
  477. TDXDecode(buf, i, &i) //现量
  478. mk.AllAmount = float64(TDXGetDouble(buf, i, &i))
  479. TDXDecode(buf, i, &i)
  480. TDXDecode(buf, i, &i)
  481. TDXDecode(buf, i, &i)
  482. TDXDecode(buf, i, &i)
  483. var bid, ask PP
  484. bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  485. ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  486. bid[1] = float64(TDXDecode(buf, i, &i))
  487. ask[1] = float64(TDXDecode(buf, i, &i))
  488. mk.Bids = append(mk.Bids, bid)
  489. mk.Asks = append(mk.Asks, ask)
  490. bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  491. ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  492. bid[1] = float64(TDXDecode(buf, i, &i))
  493. ask[1] = float64(TDXDecode(buf, i, &i))
  494. mk.Bids = append(mk.Bids, bid)
  495. mk.Asks = append(mk.Asks, ask)
  496. bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  497. ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  498. bid[1] = float64(TDXDecode(buf, i, &i))
  499. ask[1] = float64(TDXDecode(buf, i, &i))
  500. mk.Bids = append(mk.Bids, bid)
  501. mk.Asks = append(mk.Asks, ask)
  502. bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  503. ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  504. bid[1] = float64(TDXDecode(buf, i, &i))
  505. ask[1] = float64(TDXDecode(buf, i, &i))
  506. mk.Bids = append(mk.Bids, bid)
  507. mk.Asks = append(mk.Asks, ask)
  508. bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  509. ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  510. bid[1] = float64(TDXDecode(buf, i, &i))
  511. ask[1] = float64(TDXDecode(buf, i, &i))
  512. mk.Bids = append(mk.Bids, bid)
  513. mk.Asks = append(mk.Asks, ask)
  514. i += 3
  515. TDXDecode(buf, i, &i)
  516. TDXDecode(buf, i, &i)
  517. TDXDecode(buf, i, &i)
  518. TDXGetInt16(buf, i, &i)
  519. //float speed=(float)(t/100.0);
  520. TDXGetInt16(buf, i, &i)
  521. endPos := i
  522. dataStr := string(buf[startPos:endPos])
  523. bsame := false
  524. lastData, ok := lastDataMap[symbol]
  525. if ok {
  526. if lastData == dataStr {
  527. bsame = true
  528. }
  529. }
  530. if !bsame {
  531. mks = append(mks, mk)
  532. }
  533. lastDataMap[symbol] = dataStr
  534. }
  535. for _, v := range mks {
  536. //if v.InsId == 1 {
  537. //log.Println("[readTicks]data trace")
  538. //}
  539. tds.Save(v)
  540. }
  541. //end := time.Now().UnixNano()
  542. //log.Println("time used:", end-start)
  543. time.Sleep(time.Millisecond * FETCH_PER_MILLISECOND)
  544. }
  545. }
  546. /*
  547. func (tds *TdxDS) readTick(conn net.Conn, szOrsh byte, symbol string) ([]*Market, error) {
  548. bb1 := []byte("\x0C\x01\x20\x63\x00\x02\x13\x00\x13\x00\x3E\x05\x05\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x30\x30\x30\x30\x30\x31")
  549. bb1[22] = szOrsh //市场0深圳 1上海
  550. bBuf := bytes.NewBuffer(bb1[23:])
  551. bBuf.Reset()
  552. binary.Write(bBuf, binary.LittleEndian, symbol[:6])
  553. _, err := conn.Write(bb1)
  554. if err != nil {
  555. return nil, err
  556. }
  557. debuf, err := readBuf(conn)
  558. if err != nil {
  559. return nil, err
  560. }
  561. n := binary.LittleEndian.Uint16(debuf[2:])
  562. if n < 1 {
  563. return nil, errors.New("no data")
  564. }
  565. var i int
  566. buf := debuf[4:]
  567. var mks []*Market
  568. for j := 0; j < int(n); j++ {
  569. //m := buf[i]
  570. var code [8]byte
  571. C.memcpy(unsafe.Pointer(&code[0]), unsafe.Pointer(&buf[i+1]), 6)
  572. //codeStr := string(code[:6])
  573. //_, ok := tds.stocks[codeStr]
  574. //if !ok {
  575. //log.Println("invalid code:", codeStr)
  576. //continue
  577. //}
  578. mk := &Market{}
  579. mk.Type = IntTdx
  580. mk.InsId, _ = strconv.ParseInt(symbol, 10, 64)
  581. dd := float64(100.0)
  582. i += 9
  583. mk.Close = float64(TDXDecode(buf, i, &i)) / dd
  584. mk.LastPrice = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  585. mk.Open = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  586. mk.High = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  587. mk.Low = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  588. mk.Timestamp = tds.ParseTime(TDXDecode(buf, i, &i))
  589. TDXDecode(buf, i, &i)
  590. mk.LastVolume = float64(TDXDecode(buf, i, &i))
  591. TDXDecode(buf, i, &i) //现量
  592. mk.AllAmount = float64(TDXGetDouble(buf, i, &i))
  593. TDXDecode(buf, i, &i)
  594. TDXDecode(buf, i, &i)
  595. TDXDecode(buf, i, &i)
  596. TDXDecode(buf, i, &i)
  597. var bid, ask PP
  598. bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  599. ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  600. bid[1] = float64(TDXDecode(buf, i, &i))
  601. ask[1] = float64(TDXDecode(buf, i, &i))
  602. mk.Bids = append(mk.Bids, bid)
  603. mk.Asks = append(mk.Asks, ask)
  604. bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  605. ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  606. bid[1] = float64(TDXDecode(buf, i, &i))
  607. ask[1] = float64(TDXDecode(buf, i, &i))
  608. mk.Bids = append(mk.Bids, bid)
  609. mk.Asks = append(mk.Asks, ask)
  610. bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  611. ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  612. bid[1] = float64(TDXDecode(buf, i, &i))
  613. ask[1] = float64(TDXDecode(buf, i, &i))
  614. mk.Bids = append(mk.Bids, bid)
  615. mk.Asks = append(mk.Asks, ask)
  616. bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  617. ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  618. bid[1] = float64(TDXDecode(buf, i, &i))
  619. ask[1] = float64(TDXDecode(buf, i, &i))
  620. mk.Bids = append(mk.Bids, bid)
  621. mk.Asks = append(mk.Asks, ask)
  622. bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  623. ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
  624. bid[1] = float64(TDXDecode(buf, i, &i))
  625. ask[1] = float64(TDXDecode(buf, i, &i))
  626. mk.Bids = append(mk.Bids, bid)
  627. mk.Asks = append(mk.Asks, ask)
  628. i += 3
  629. TDXDecode(buf, i, &i)
  630. TDXDecode(buf, i, &i)
  631. TDXDecode(buf, i, &i)
  632. TDXGetInt16(buf, i, &i)
  633. //speed := float32(t) / 100.0
  634. TDXGetInt16(buf, i, &i)
  635. mks = append(mks, mk)
  636. //log.Println(mk)
  637. }
  638. return mks, nil
  639. }*/
  640. func (tds *TdxDS) ParseTime(tdxTime int32) int64 {
  641. tdxTimeStr := fmt.Sprintf("%d", tdxTime)
  642. tdxTimeBytes := []byte(tdxTimeStr)
  643. if tdxTimeBytes[0] != '1' && tdxTimeBytes[0] != '2' {
  644. tdxTimeStr = "0" + tdxTimeStr
  645. }
  646. datetimeStr := strconv.Itoa(int(tds.datetime)) + tdxTimeStr
  647. var year, month, day, hour, minute, second, millisecond int
  648. fmt.Sscanf(datetimeStr, "%04d%02d%02d%02d%02d%02d%d", &year, &month, &day, &hour, &minute, &second, &millisecond)
  649. if second > 59 {
  650. second -= 60
  651. minute++
  652. }
  653. //log.Println(datetimeStr, tdxTimeStr, tdxTimeBytes, tdxTime, year, month, day, hour, minute, second, millisecond)
  654. t := time.Date(year, time.Month(month), day, hour, minute, second, 0, time.Local)
  655. //t, _ := time.Parse("2015090909090909", datetimeStr)
  656. return t.Unix() * 1000
  657. }
  658. //解包数据
  659. func TDXDecode(buf []byte, start int, next *int) int32 {
  660. var num uint32
  661. var num3, num2, num4, num5, num6, num7, num8 int32
  662. var cc byte
  663. for num2 < 0x20 {
  664. cc = buf[int32(start)+num2]
  665. num4 = int32(cc)
  666. num5 = (num4 & 0x80) / 0x80
  667. if num2 == 0 {
  668. num3 = 1 - (((num4 & 0x40) / 0x40) * 2)
  669. num6 = num4 & 0x3F
  670. num = uint32(int64(num) + int64(num6))
  671. } else if num2 == 1 {
  672. num7 = (num4 & 0x7F) * (2 << (uint64(num2)*6 - 1)) // power(2, num2 * 6));
  673. num = uint32(int64(num) + int64(num7))
  674. } else {
  675. num8 = (num4 & 0x7F) * (2 << (uint64(num2)*7 - 2)) // Power(2, (num2 * 7) - 1);
  676. num = uint32(int64(num) + int64(num8))
  677. }
  678. if num5 == 0 {
  679. num = uint32(int64(num) * int64(num3))
  680. break
  681. }
  682. num2++
  683. }
  684. *next = start + int(num2) + 1
  685. return int32(num)
  686. }
  687. //读取16位数据
  688. func TDXGetInt16(buf []byte, start int, next *int) int16 {
  689. Num := binary.LittleEndian.Uint16(buf[start:])
  690. *next = start + 2
  691. return int16(Num)
  692. }
  693. //读取32位数据
  694. func TDXGetInt32(buf []byte, start int, next *int) int32 {
  695. Num := binary.LittleEndian.Uint32(buf[start:])
  696. *next = start + 4
  697. return int32(Num)
  698. }
  699. //读取浮点数据float
  700. func TDXGetDouble(buf []byte, start int, next *int) float32 {
  701. var Num float32
  702. bBuf := bytes.NewBuffer(buf[start:])
  703. binary.Read(bBuf, binary.LittleEndian, &Num)
  704. *next = start + 4
  705. return Num
  706. }
  707. //读取时间:HHMM
  708. func TDXGetTime(buf []byte, start int, next *int) int {
  709. i := TDXGetInt16(buf, start, next)
  710. mm := (i / 60)
  711. ss := (i % 60)
  712. if ss > 59 {
  713. ss = ss - 60
  714. mm++
  715. }
  716. ri := mm*100 + ss
  717. return int(ri)
  718. }
  719. func TDXGetDate(v int32, yy *int, mm *int, dd *int, hhh *int, mmm *int) {
  720. *yy = 2012
  721. *mm = 1
  722. *dd = 1
  723. *hhh = 9
  724. *mmm = 30
  725. if v > 21000000 {
  726. *yy = int(2004 + ((v & 0xF800) >> 11))
  727. d1 := v & 0x7FF
  728. *mm = int(d1 / 100)
  729. *dd = int(d1 % 100)
  730. d2 := v >> 16
  731. *hhh = int(d2 / 60)
  732. *mmm = int(d2 % 60)
  733. } else {
  734. *yy = int(v / 10000)
  735. *mm = (int(v) - *yy*10000) / 100
  736. *dd = int(v % 100)
  737. *hhh = 9
  738. *mmm = 30
  739. }
  740. } //解包数据
  741. func inTime() bool {
  742. t := time.Now()
  743. if t.Weekday() == time.Saturday || t.Weekday() == time.Sunday {
  744. return false
  745. }
  746. mm := t.Hour()*60 + t.Minute()
  747. for _, ti := range cffexTi {
  748. m1 := ti.st.hour*60 + ti.st.minute
  749. m2 := ti.et.hour*60 + ti.et.minute
  750. if mm >= m1 && mm <= m2 {
  751. return true
  752. }
  753. }
  754. return false
  755. }
  756. func readBuf(conn net.Conn) ([]byte, error) {
  757. var head RecvDataHeader
  758. err := binary.Read(conn, binary.LittleEndian, &head)
  759. if err != nil {
  760. return nil, err
  761. }
  762. if head.CheckSum != 7654321 {
  763. return nil, errors.New("error checksum")
  764. }
  765. buf := make([]byte, int(head.Size))
  766. n, err := io.ReadFull(conn, buf)
  767. if err != nil {
  768. return nil, err
  769. }
  770. if int(head.Size) != n {
  771. return nil, errors.New("read size error")
  772. }
  773. //log.Println(head)
  774. var debuf []byte
  775. if (head.EncodeMode & 0x10) == 0x10 { //gzip compress
  776. reader, err := zlib.NewReader(bytes.NewBuffer(buf))
  777. defer reader.Close()
  778. if err != nil {
  779. return nil, err
  780. }
  781. debuf = make([]byte, int(head.DePackSize))
  782. n, err := io.ReadFull(reader, debuf)
  783. if err != nil {
  784. return nil, err
  785. }
  786. if n != int(head.DePackSize) {
  787. return nil, errors.New("depack size error")
  788. }
  789. } else {
  790. debuf = buf
  791. }
  792. return debuf, nil
  793. }
  794. func loadServers(fname string) ([]string, error) {
  795. fp, err := os.Open(fname)
  796. if err != nil {
  797. return nil, err
  798. }
  799. defer fp.Close()
  800. fi, err := fp.Stat()
  801. if err != nil {
  802. return nil, err
  803. }
  804. buf := make([]byte, fi.Size())
  805. n, err := io.ReadFull(fp, buf)
  806. if err != nil {
  807. return nil, err
  808. }
  809. if n != len(buf) {
  810. return nil, errors.New("can't read all data")
  811. }
  812. var realservers []string
  813. servers := strings.Split(string(buf), "\n")
  814. //去重
  815. serversMap := make(map[string]int)
  816. for _, server := range servers {
  817. _, ok := serversMap[server]
  818. if !ok {
  819. realservers = append(realservers, server)
  820. serversMap[server] = 0
  821. } else {
  822. log.Println("duplicate server:", server)
  823. }
  824. }
  825. log.Println("servers:", len(servers), "realservers:", len(realservers))
  826. return realservers, nil
  827. }