|
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package tick
- /*
- #include <string.h>
- */
- import "C"
- // 本文件实现大智慧数据源接口, 实时数据和历史数据的获取和保存
- import (
- "bytes"
- "compress/zlib"
- "container/list"
- "encoding/binary"
- "errors"
- "fmt"
- "io"
- "log"
- "net"
- "os"
- "strconv"
- "strings"
- "sync"
- "time"
- "unsafe"
- "tickserver/server/market"
- "golang.org/x/text/encoding/simplifiedchinese"
- )
- const (
- STOCK_PER_SERVER = 50
- FETCH_PER_MILLISECOND = 100
- )
- type RecvDataHeader struct {
- CheckSum int32
- EncodeMode byte
- Tmp [5]byte
- Msgid int16
- Size int16
- DePackSize int16
- }
- // 公司资料原始数据
- type TdxStockInfo struct { // 初始化数据 29字节
- Code [6]byte //代码
- Rate int16 // 实时盘口中的成交量除去的除数?1手=n股?
- Name [8]byte //名称
- W1 int16 //w1 为5日平均量(用于量比计算)
- W2 int16
- PriceMag byte //小数点位数
- YClose float32 //昨收
- W3 int16
- W4 int16
- }
- //权息
- type QuanInfo struct {
- style byte
- day int32
- q1 float32
- q2 float32
- q3 float32
- q4 float32
- //style=1 (除权除息) (送现金,配股价,送股数,配股比例);
- //style=2 (送配股上市)
- //style=9 (转配股上市) (股本变化) Q1=前流通盘 Q2=前总股本 Q3=后流通盘 Q4=后总股本
- //style=3 (非流通股上市) 前流通盘 前总股本 后流通盘 后总股本
- // 3 送现金:3499.00 配股价:17468.00 送股数:4368.00 配股比例:17468.00
- // 权息日 类别 送转股 分红 配股 配股价 前流通盘 后流通盘 前总股本 后总股本
- //20120618 非流通股上市 3499.0 4368.0 17468.0 17468.0
- //style=5 (股本变化)前流通盘 前总股本 后流通盘 后总股本
- // 权息日 类别 送转股 分红 配股 配股价 前流通盘 后流通盘 前总股本 后总股本
- //20120316 股本变化 0.0 3499.0 0.0 17468.0
- //0 002663 Date:20120316 5 送现金: 0.00 配股价: 0.00 送股数:3499.00 配股比例:17468.00
- }
- type CaiWu struct {
- Mark byte
- code [6]byte
- LTG float32 //流通股数量
- t1 int16
- t2 int16
- day1 int32
- day2 int32
- zl [30]float32
- }
- type Stock struct {
- no int32 //no=mark*1000000+code;
- szOrsh byte
- quanlen int //权息长度
- gp TdxStockInfo
- quan [80]QuanInfo
- cw CaiWu
- }
- // TdxDS实现了dataSource接口, 并对tdx的历史数据和实时数据保存
- type TdxDS struct {
- *DSBase
- conf *DsConf
- datetime uint32
- stocks map[string]*Stock
- servers []string
- serverlist *list.List
- symbolsGroup [][STOCK_PER_SERVER]string
- exsGroup [][STOCK_PER_SERVER]byte
- conn net.Conn
- instrumentUpdated bool
- goroutineNum int
- statusCh chan int
- mu sync.Mutex
- }
- func init() {
- drivers[Tdx] = newTdxDS
- }
- func newTdxDS(conf *DsConf) (DataSource, error) {
- tds := &TdxDS{
- DSBase: NewDsBase(conf),
- conf: conf,
- stocks: make(map[string]*Stock),
- serverlist: list.New(),
- instrumentUpdated: false,
- statusCh: make(chan int, 1),
- }
- var err error
- tds.servers, err = loadServers(conf.CfgFile)
- if err != nil {
- return nil, err
- }
- for _, v := range tds.servers {
- tds.serverlist.PushBack(v)
- }
- err = tds.getConn()
- if err != nil {
- return nil, err
- }
- err = tds.getInstrument(0)
- if err != nil {
- return nil, err
- }
- err = tds.getInstrument(1)
- if err != nil {
- return nil, err
- }
- tds.conn.Close()
- i := 0
- var symbols [STOCK_PER_SERVER]string
- var exs [STOCK_PER_SERVER]byte
- for k, v := range tds.stocks {
- symbols[i] = k
- exs[i] = v.szOrsh
- i++
- if i >= STOCK_PER_SERVER {
- i = 0
- tds.symbolsGroup = append(tds.symbolsGroup, symbols)
- tds.exsGroup = append(tds.exsGroup, exs)
- }
- }
- if i > 0 {
- for j := i; j < STOCK_PER_SERVER; j++ {
- symbols[j] = ""
- }
- tds.symbolsGroup = append(tds.symbolsGroup, symbols)
- tds.exsGroup = append(tds.exsGroup, exs)
- }
- return tds, nil
- }
- func (tds *TdxDS) Name() string {
- return Tdx
- }
- func (tds *TdxDS) Run() {
- log.Println("TdxDS.Run")
- //tds.stocks["000001"] = &Stock{}
- //tds.readTick(tds.conn, 0, "000001")
- //var symbols = [STOCK_PER_SERVER]string{"000716", "002216", "002329", "002481", "002495", "002507", "002570", "002626", "002650", "002661", "002719", "002732", "300146", "300149", "300381", "300401", "600073", "600186", "600298", "600305", "600419", "600429", "600597", "600866", "600872", "600873", "600887", "603020", "603288", "000019", "000557", "000568", "000596", "000729", "000752", "000799", "000848", "000858", "000869", "000929", "000995", "002304", "002387", "002461", "002646", "600059", "600084", "600090", "600132", "600197"}
- //var exs = [STOCK_PER_SERVER]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1}
- //tds.readTicks(symbols, exs)
- go tds.updateInstruments()
- tds.fetchMarket()
- }
- func (tds *TdxDS) fetchMarket() {
- for i, v := range tds.symbolsGroup {
- go tds.readTicks(v, tds.exsGroup[i])
- tds.goroutineNum++
- }
- }
- func (tds *TdxDS) updateInstruments() {
- var err error
- ticker := time.Tick(time.Second * 30)
- for t := range ticker {
- if t.Hour() == 7 && t.Minute() == 0 { // 7:00重新连接服务器和获得股票信息
- tds.getConn()
- err = tds.getInstrument(0)
- if err != nil {
- log.Println(err)
- }
- err = tds.getInstrument(1)
- if err != nil {
- log.Println(err)
- }
- tds.conn.Close()
- i := 0
- tds.symbolsGroup = nil
- tds.exsGroup = nil
- var symbols [STOCK_PER_SERVER]string
- var exs [STOCK_PER_SERVER]byte
- for k, v := range tds.stocks {
- symbols[i] = k
- exs[i] = v.szOrsh
- i++
- if i >= STOCK_PER_SERVER {
- i = 0
- tds.symbolsGroup = append(tds.symbolsGroup, symbols)
- tds.exsGroup = append(tds.exsGroup, exs)
- }
- }
- if i > 0 {
- for j := i; j < STOCK_PER_SERVER; j++ {
- symbols[j] = ""
- }
- tds.symbolsGroup = append(tds.symbolsGroup, symbols)
- tds.exsGroup = append(tds.exsGroup, exs)
- }
- log.Println("updateInstruments begin")
- tds.instrumentUpdated = true
- for m := 0; m < tds.goroutineNum; m++ { //等待readTicks结束
- <-tds.statusCh
- }
- log.Println("updateInstruments end")
- tds.goroutineNum = 0
- tds.instrumentUpdated = false
- tds.fetchMarket()
- }
- }
- }
- func (tds *TdxDS) getTickConn() net.Conn {
- servernum := len(tds.servers)
- count := 0
- for {
- tds.mu.Lock()
- e := tds.serverlist.Front()
- server := tds.serverlist.Remove(e)
- tds.serverlist.PushBack(server)
- tds.mu.Unlock()
- count++
- if count > servernum {
- return nil
- }
- conn, err := net.DialTimeout("tcp", server.(string), 5*time.Second)
- if err != nil {
- log.Println(server.(string), err)
- continue
- }
- conn.(*net.TCPConn).SetDeadline(time.Now().Add(5 * time.Second))
- _, err = conn.Write([]byte("\x0C\x02\x18\x93\x00\x01\x03\x00\x03\x00\x0D\x00\x01"))
- if err != nil {
- log.Println(server.(string), err)
- continue
- }
- _, err = readBuf(conn)
- if err != nil {
- log.Println(server.(string), err)
- continue
- }
- //tds.datetime = binary.LittleEndian.Uint32(debuf[42:46])
- //log.Println("最后交易日:", tds.datetime)
- //log.Println("服务器名称:", decodeString(debuf[68:]))
- return conn
- }
- return nil
- }
- func (tds *TdxDS) getConn() error {
- for _, v := range tds.servers {
- conn, err := net.DialTimeout("tcp", v, 5*time.Second)
- if err != nil {
- log.Println(v, err)
- continue
- }
- conn.(*net.TCPConn).SetDeadline(time.Now().Add(5 * time.Second))
- _, err = conn.Write([]byte("\x0C\x02\x18\x93\x00\x01\x03\x00\x03\x00\x0D\x00\x01"))
- if err != nil {
- log.Println(v, err)
- continue
- }
- debuf, err := readBuf(conn)
- if err != nil {
- log.Println(v, err)
- continue
- }
- tds.datetime = binary.LittleEndian.Uint32(debuf[42:46])
- log.Println("最后交易日:", tds.datetime)
- log.Println("服务器名称:", decodeString(debuf[68:]))
- tds.conn = conn
- return nil
- }
- return errors.New("no conn available")
- }
- func decodeString(debuf []byte) string {
- var name []byte
- for i := 0; i < len(debuf); i++ {
- if int(debuf[i]) == 0 {
- name = debuf[0:i]
- break
- }
- }
- trans := simplifiedchinese.GBK.NewDecoder()
- dst := make([]byte, 1024)
- nDst, _, err := trans.Transform(dst, name, true)
- if err != nil {
- panic(err)
- }
- return string(dst[0:nDst])
- }
- func (tds *TdxDS) getInstrument(szOrsh byte) error {
- log.Println("getInstrument", szOrsh)
- bb := []byte("\x0C\x0C\x18\x6C\x00\x01\x08\x00\x08\x00\x4E\x04\xFF\x00\x01\x02\x03\x04") //取得股票数量
- bb[12] = szOrsh //0深圳 1上海
- wlen := len(bb)
- buf := bytes.NewBuffer(bb[14:])
- buf.Reset()
- binary.Write(buf, binary.LittleEndian, &tds.datetime)
- wlen, err := tds.conn.Write(bb[:wlen])
- if err != nil {
- return err
- }
- debuf, err := readBuf(tds.conn)
- if err != nil {
- return err
- }
- szcount := binary.LittleEndian.Uint16(debuf[:])
- //log.Println(wlen, szcount)
- trans := simplifiedchinese.GBK.NewDecoder()
- dst := make([]byte, 1024)
- var count uint16
- for count < szcount {
- bb11 := []byte("\x0C\x01\x18\x64\x01\x01\x06\x00\x06\x00\x50\x04\xFF\x00\xF2\xF3")
- bb11[12] = szOrsh
- wlen := len(bb11)
- buf := bytes.NewBuffer(bb11[14:])
- buf.Reset()
- binary.Write(buf, binary.LittleEndian, &count)
- wlen, err := tds.conn.Write(bb11[:wlen])
- if err != nil {
- continue
- }
- debuf, err := readBuf(tds.conn)
- if err != nil {
- return err
- }
- n := binary.LittleEndian.Uint16(debuf[:])
- stockInfoSize := int(unsafe.Sizeof(TdxStockInfo{}))
- stockInfoSize = 29
- var stock Stock
- for j := 0; j < int(n); j++ {
- buf := bytes.NewBuffer(debuf[2+j*stockInfoSize:])
- binary.Read(buf, binary.LittleEndian, &stock.gp)
- codeStr := string(stock.gp.Code[:])
- no, _ := strconv.Atoi(codeStr)
- stock.no = int32(no)
- stock.szOrsh = szOrsh
- _, ok := tds.stocks[codeStr]
- if !ok {
- tds.stocks[codeStr] = &stock
- ins, ok1 := tds.insMap[int64(stock.no)]
- if !ok1 {
- exid := SHEX
- if szOrsh == 0 {
- exid = SZEX
- }
- priceInc := float64(1.0)
- for i := 0; i < int(stock.gp.PriceMag); i++ {
- priceInc /= 10
- }
- nDst, _, _ := trans.Transform(dst, stock.gp.Name[:], true)
- ins = &Instrument{
- Id: int64(stock.no),
- Name: string(dst[0:nDst]),
- Type: market.Securities,
- ExId: exid,
- PriceInc: priceInc,
- }
- tds.insMap[ins.Id] = ins
- //log.Println(ins)
- }
- }
- count++
- }
- }
- return nil
- }
- func (tds *TdxDS) readTicks(symbols [STOCK_PER_SERVER]string, exs [STOCK_PER_SERVER]byte) {
- conn := tds.getTickConn()
- if conn == nil {
- log.Println("no conn available")
- return
- }
- defer conn.Close()
- lastDataMap := make(map[string]string)
- for {
- if tds.instrumentUpdated {
- tds.statusCh <- 1
- return
- }
- if !inTime() {
- time.Sleep(time.Second * 1)
- continue
- }
- //start := time.Now().UnixNano()
- var bb [800]byte
- bb2 := []byte("\x0C\x01\x20\x63\x00\x02\x13\x00\x13\x00\x3E\x05\x05\x00\x00\x00\x00\x00\x00\x00\x01\x00")
- C.memcpy(unsafe.Pointer(&bb[0]), unsafe.Pointer(&bb2[0]), C.size_t(len(bb2)))
- i := len(bb2)
- for index := 0; index < STOCK_PER_SERVER; index++ {
- if "" == symbols[index] {
- continue
- }
- bb[i] = exs[index]
- i++
- tmpsymbol := []byte(symbols[index])
- C.memcpy(unsafe.Pointer(&bb[i]), unsafe.Pointer(&tmpsymbol[0]), 6)
- i += 6
- }
- bb[20] = byte((i - 22) / 7) //数量
- len := uint16(i) - 10
- binary.LittleEndian.PutUint16(bb[6:], len)
- binary.LittleEndian.PutUint16(bb[8:], len)
- _, err := conn.Write(bb[:i])
- if err != nil {
- log.Println("readTicks.Write", err)
- conn.Close()
- conn = nil
- for {
- conn = tds.getTickConn()
- if conn != nil {
- break
- } else {
- time.Sleep(5 * time.Second)
- }
- }
- continue
- }
- debuf, err := readBuf(conn)
- if err != nil {
- log.Println("readTicks.Read", err)
- conn.Close()
- conn = nil
- for {
- conn = tds.getTickConn()
- if conn != nil {
- break
- } else {
- time.Sleep(5 * time.Second)
- }
- }
- continue
- }
- n := binary.LittleEndian.Uint16(debuf[2:])
- if n < 1 {
- log.Println("readTicks.n no data fetched")
- continue
- }
- buf := debuf[4:]
- i = 0
- var mks []*Market
- for j := 0; j < int(n); j++ {
- //m := buf[i]
- var code [8]byte
- C.memcpy(unsafe.Pointer(&code[0]), unsafe.Pointer(&buf[i+1]), 6)
- symbol := string(code[:6])
- mk := &Market{}
- mk.Type = IntTdx
- mk.InsId, _ = strconv.ParseInt(string(code[:6]), 10, 64)
- dd := float64(100.0)
- i += 9
- startPos := i
- mk.Close = float64(TDXDecode(buf, i, &i)) / dd
- mk.LastPrice = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- mk.Open = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- mk.High = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- mk.Low = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- TDXDecode(buf, i, &i) //Time := TDXDecode(buf, i, &i)
- mk.Timestamp = time.Now().Unix() * 1000 //tds.ParseTime(Time)
- TDXDecode(buf, i, &i)
- mk.LastVolume = float64(TDXDecode(buf, i, &i))
- TDXDecode(buf, i, &i) //现量
- mk.AllAmount = float64(TDXGetDouble(buf, i, &i))
- TDXDecode(buf, i, &i)
- TDXDecode(buf, i, &i)
- TDXDecode(buf, i, &i)
- TDXDecode(buf, i, &i)
- var bid, ask PP
- bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- bid[1] = float64(TDXDecode(buf, i, &i))
- ask[1] = float64(TDXDecode(buf, i, &i))
- mk.Bids = append(mk.Bids, bid)
- mk.Asks = append(mk.Asks, ask)
- bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- bid[1] = float64(TDXDecode(buf, i, &i))
- ask[1] = float64(TDXDecode(buf, i, &i))
- mk.Bids = append(mk.Bids, bid)
- mk.Asks = append(mk.Asks, ask)
- bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- bid[1] = float64(TDXDecode(buf, i, &i))
- ask[1] = float64(TDXDecode(buf, i, &i))
- mk.Bids = append(mk.Bids, bid)
- mk.Asks = append(mk.Asks, ask)
- bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- bid[1] = float64(TDXDecode(buf, i, &i))
- ask[1] = float64(TDXDecode(buf, i, &i))
- mk.Bids = append(mk.Bids, bid)
- mk.Asks = append(mk.Asks, ask)
- bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- bid[1] = float64(TDXDecode(buf, i, &i))
- ask[1] = float64(TDXDecode(buf, i, &i))
- mk.Bids = append(mk.Bids, bid)
- mk.Asks = append(mk.Asks, ask)
- i += 3
- TDXDecode(buf, i, &i)
- TDXDecode(buf, i, &i)
- TDXDecode(buf, i, &i)
- TDXGetInt16(buf, i, &i)
- //float speed=(float)(t/100.0);
- TDXGetInt16(buf, i, &i)
- endPos := i
- dataStr := string(buf[startPos:endPos])
- bsame := false
- lastData, ok := lastDataMap[symbol]
- if ok {
- if lastData == dataStr {
- bsame = true
- }
- }
- if !bsame {
- mks = append(mks, mk)
- }
- lastDataMap[symbol] = dataStr
- }
- for _, v := range mks {
- //if v.InsId == 1 {
- //log.Println("[readTicks]data trace")
- //}
- tds.Save(v)
- }
- //end := time.Now().UnixNano()
- //log.Println("time used:", end-start)
- time.Sleep(time.Millisecond * FETCH_PER_MILLISECOND)
- }
- }
- /*
- func (tds *TdxDS) readTick(conn net.Conn, szOrsh byte, symbol string) ([]*Market, error) {
- bb1 := []byte("\x0C\x01\x20\x63\x00\x02\x13\x00\x13\x00\x3E\x05\x05\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x30\x30\x30\x30\x30\x31")
- bb1[22] = szOrsh //市场0深圳 1上海
- bBuf := bytes.NewBuffer(bb1[23:])
- bBuf.Reset()
- binary.Write(bBuf, binary.LittleEndian, symbol[:6])
- _, err := conn.Write(bb1)
- if err != nil {
- return nil, err
- }
- debuf, err := readBuf(conn)
- if err != nil {
- return nil, err
- }
- n := binary.LittleEndian.Uint16(debuf[2:])
- if n < 1 {
- return nil, errors.New("no data")
- }
- var i int
- buf := debuf[4:]
- var mks []*Market
- for j := 0; j < int(n); j++ {
- //m := buf[i]
- var code [8]byte
- C.memcpy(unsafe.Pointer(&code[0]), unsafe.Pointer(&buf[i+1]), 6)
- //codeStr := string(code[:6])
- //_, ok := tds.stocks[codeStr]
- //if !ok {
- //log.Println("invalid code:", codeStr)
- //continue
- //}
- mk := &Market{}
- mk.Type = IntTdx
- mk.InsId, _ = strconv.ParseInt(symbol, 10, 64)
- dd := float64(100.0)
- i += 9
- mk.Close = float64(TDXDecode(buf, i, &i)) / dd
- mk.LastPrice = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- mk.Open = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- mk.High = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- mk.Low = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- mk.Timestamp = tds.ParseTime(TDXDecode(buf, i, &i))
- TDXDecode(buf, i, &i)
- mk.LastVolume = float64(TDXDecode(buf, i, &i))
- TDXDecode(buf, i, &i) //现量
- mk.AllAmount = float64(TDXGetDouble(buf, i, &i))
- TDXDecode(buf, i, &i)
- TDXDecode(buf, i, &i)
- TDXDecode(buf, i, &i)
- TDXDecode(buf, i, &i)
- var bid, ask PP
- bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- bid[1] = float64(TDXDecode(buf, i, &i))
- ask[1] = float64(TDXDecode(buf, i, &i))
- mk.Bids = append(mk.Bids, bid)
- mk.Asks = append(mk.Asks, ask)
- bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- bid[1] = float64(TDXDecode(buf, i, &i))
- ask[1] = float64(TDXDecode(buf, i, &i))
- mk.Bids = append(mk.Bids, bid)
- mk.Asks = append(mk.Asks, ask)
- bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- bid[1] = float64(TDXDecode(buf, i, &i))
- ask[1] = float64(TDXDecode(buf, i, &i))
- mk.Bids = append(mk.Bids, bid)
- mk.Asks = append(mk.Asks, ask)
- bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- bid[1] = float64(TDXDecode(buf, i, &i))
- ask[1] = float64(TDXDecode(buf, i, &i))
- mk.Bids = append(mk.Bids, bid)
- mk.Asks = append(mk.Asks, ask)
- bid[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- ask[0] = mk.Close + float64(TDXDecode(buf, i, &i))/dd
- bid[1] = float64(TDXDecode(buf, i, &i))
- ask[1] = float64(TDXDecode(buf, i, &i))
- mk.Bids = append(mk.Bids, bid)
- mk.Asks = append(mk.Asks, ask)
- i += 3
- TDXDecode(buf, i, &i)
- TDXDecode(buf, i, &i)
- TDXDecode(buf, i, &i)
- TDXGetInt16(buf, i, &i)
- //speed := float32(t) / 100.0
- TDXGetInt16(buf, i, &i)
- mks = append(mks, mk)
- //log.Println(mk)
- }
- return mks, nil
- }*/
- func (tds *TdxDS) ParseTime(tdxTime int32) int64 {
- tdxTimeStr := fmt.Sprintf("%d", tdxTime)
- tdxTimeBytes := []byte(tdxTimeStr)
- if tdxTimeBytes[0] != '1' && tdxTimeBytes[0] != '2' {
- tdxTimeStr = "0" + tdxTimeStr
- }
- datetimeStr := strconv.Itoa(int(tds.datetime)) + tdxTimeStr
- var year, month, day, hour, minute, second, millisecond int
- fmt.Sscanf(datetimeStr, "%04d%02d%02d%02d%02d%02d%d", &year, &month, &day, &hour, &minute, &second, &millisecond)
- if second > 59 {
- second -= 60
- minute++
- }
- //log.Println(datetimeStr, tdxTimeStr, tdxTimeBytes, tdxTime, year, month, day, hour, minute, second, millisecond)
- t := time.Date(year, time.Month(month), day, hour, minute, second, 0, time.Local)
- //t, _ := time.Parse("2015090909090909", datetimeStr)
- return t.Unix() * 1000
- }
- //解包数据
- func TDXDecode(buf []byte, start int, next *int) int32 {
- var num uint32
- var num3, num2, num4, num5, num6, num7, num8 int32
- var cc byte
- for num2 < 0x20 {
- cc = buf[int32(start)+num2]
- num4 = int32(cc)
- num5 = (num4 & 0x80) / 0x80
- if num2 == 0 {
- num3 = 1 - (((num4 & 0x40) / 0x40) * 2)
- num6 = num4 & 0x3F
- num = uint32(int64(num) + int64(num6))
- } else if num2 == 1 {
- num7 = (num4 & 0x7F) * (2 << (uint64(num2)*6 - 1)) // power(2, num2 * 6));
- num = uint32(int64(num) + int64(num7))
- } else {
- num8 = (num4 & 0x7F) * (2 << (uint64(num2)*7 - 2)) // Power(2, (num2 * 7) - 1);
- num = uint32(int64(num) + int64(num8))
- }
- if num5 == 0 {
- num = uint32(int64(num) * int64(num3))
- break
- }
- num2++
- }
- *next = start + int(num2) + 1
- return int32(num)
- }
- //读取16位数据
- func TDXGetInt16(buf []byte, start int, next *int) int16 {
- Num := binary.LittleEndian.Uint16(buf[start:])
- *next = start + 2
- return int16(Num)
- }
- //读取32位数据
- func TDXGetInt32(buf []byte, start int, next *int) int32 {
- Num := binary.LittleEndian.Uint32(buf[start:])
- *next = start + 4
- return int32(Num)
- }
- //读取浮点数据float
- func TDXGetDouble(buf []byte, start int, next *int) float32 {
- var Num float32
- bBuf := bytes.NewBuffer(buf[start:])
- binary.Read(bBuf, binary.LittleEndian, &Num)
- *next = start + 4
- return Num
- }
- //读取时间:HHMM
- func TDXGetTime(buf []byte, start int, next *int) int {
- i := TDXGetInt16(buf, start, next)
- mm := (i / 60)
- ss := (i % 60)
- if ss > 59 {
- ss = ss - 60
- mm++
- }
- ri := mm*100 + ss
- return int(ri)
- }
- func TDXGetDate(v int32, yy *int, mm *int, dd *int, hhh *int, mmm *int) {
- *yy = 2012
- *mm = 1
- *dd = 1
- *hhh = 9
- *mmm = 30
- if v > 21000000 {
- *yy = int(2004 + ((v & 0xF800) >> 11))
- d1 := v & 0x7FF
- *mm = int(d1 / 100)
- *dd = int(d1 % 100)
- d2 := v >> 16
- *hhh = int(d2 / 60)
- *mmm = int(d2 % 60)
- } else {
- *yy = int(v / 10000)
- *mm = (int(v) - *yy*10000) / 100
- *dd = int(v % 100)
- *hhh = 9
- *mmm = 30
- }
- } //解包数据
- func inTime() bool {
- t := time.Now()
- if t.Weekday() == time.Saturday || t.Weekday() == time.Sunday {
- return false
- }
- mm := t.Hour()*60 + t.Minute()
- for _, ti := range cffexTi {
- m1 := ti.st.hour*60 + ti.st.minute
- m2 := ti.et.hour*60 + ti.et.minute
- if mm >= m1 && mm <= m2 {
- return true
- }
- }
- return false
- }
- func readBuf(conn net.Conn) ([]byte, error) {
- var head RecvDataHeader
- err := binary.Read(conn, binary.LittleEndian, &head)
- if err != nil {
- return nil, err
- }
- if head.CheckSum != 7654321 {
- return nil, errors.New("error checksum")
- }
- buf := make([]byte, int(head.Size))
- n, err := io.ReadFull(conn, buf)
- if err != nil {
- return nil, err
- }
- if int(head.Size) != n {
- return nil, errors.New("read size error")
- }
- //log.Println(head)
- var debuf []byte
- if (head.EncodeMode & 0x10) == 0x10 { //gzip compress
- reader, err := zlib.NewReader(bytes.NewBuffer(buf))
- defer reader.Close()
- if err != nil {
- return nil, err
- }
- debuf = make([]byte, int(head.DePackSize))
- n, err := io.ReadFull(reader, debuf)
- if err != nil {
- return nil, err
- }
- if n != int(head.DePackSize) {
- return nil, errors.New("depack size error")
- }
- } else {
- debuf = buf
- }
- return debuf, nil
- }
- func loadServers(fname string) ([]string, error) {
- fp, err := os.Open(fname)
- if err != nil {
- return nil, err
- }
- defer fp.Close()
- fi, err := fp.Stat()
- if err != nil {
- return nil, err
- }
- buf := make([]byte, fi.Size())
- n, err := io.ReadFull(fp, buf)
- if err != nil {
- return nil, err
- }
- if n != len(buf) {
- return nil, errors.New("can't read all data")
- }
- var realservers []string
- servers := strings.Split(string(buf), "\n")
- //去重
- serversMap := make(map[string]int)
- for _, server := range servers {
- _, ok := serversMap[server]
- if !ok {
- realservers = append(realservers, server)
- serversMap[server] = 0
- } else {
- log.Println("duplicate server:", server)
- }
- }
- log.Println("servers:", len(servers), "realservers:", len(realservers))
- return realservers, nil
- }
|