123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617 |
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package client
- // 本文件为简化客户端的实现, 对Server服务调用进行封装
- import (
- "bytes"
- "encoding/binary"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "log"
- "math/rand"
- "net"
- "net/http"
- "net/url"
- "os"
- "path"
- "sync"
- "time"
- )
- // Client对Server的rpc调用进行封装
- // 并保存下载的历史数据, 避免重复下载
- // 使用数据库管理下载的数据
- type ClientSimple struct {
- clientSub net.Conn // 由于服务器行情数据较多, 为避免和其他数据冲突, 采用新的连接单独处理行情数据
- enc encoder
- dec decoder
- insMap map[string]*Instrument // 产品列表
- httpComAddr, subAddr, httpAddr string // 服务器地址
- mapSubs map[string]int64
- dir string
- mu sync.Mutex
- }
- var instance *ClientSimple
- var mu sync.Mutex
- func GetClientInstance(rpcAddr, rtRpcAddr, subAddr, httpAddr, dir string) (*ClientSimple, error) {
- mu.Lock()
- defer mu.Unlock()
- if instance != nil {
- return instance, nil
- }
- var err error
- instance, err = NewClientSimple(rpcAddr, rtRpcAddr, subAddr, httpAddr, dir)
- if err != nil {
- instance = nil
- }
- return instance, err
- }
- //简化下载流程,去掉数据库,直接判断文件是否存在来查看本地是否有cache
- func NewClientSimple(rpcAddr, rtRpcAddr, subAddr, httpAddr, dir string) (*ClientSimple, error) {
- rand.Seed(time.Now().UnixNano())
- c := &ClientSimple{
- insMap: make(map[string]*Instrument),
- httpComAddr: rpcAddr,
- subAddr: subAddr,
- httpAddr: httpAddr,
- dir: dir,
- mapSubs: make(map[string]int64),
- }
- err := c.connectSub(subAddr)
- if err != nil {
- return nil, err
- }
- err = c.getInsMap()
- if err != nil {
- return nil, err
- }
- go c.doInput()
- go c.doSubTest()
- return c, nil
- }
- func (c *ClientSimple) GetInsName(insId string) string {
- return c.insMap[insId].Name
- }
- func (c *ClientSimple) getInsMap() error {
- u := fmt.Sprintf("http://%s/instruments", c.httpComAddr)
- log.Println("beg", u)
- resp, err := http.Get(u)
- log.Println("end", u)
- if err != nil {
- return err
- }
- defer resp.Body.Close()
- data, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return err
- }
- json.Unmarshal(data, &c.insMap)
- return nil
- }
- func (c *ClientSimple) reConnectSub(err error) {
- c.clientSub.Close()
- for err != nil {
- log.Println("client.reConnectSub error:", err)
- err = c.connectSub(c.subAddr)
- time.Sleep(time.Second * 10)
- }
- for insId, h := range c.mapSubs {
- c.subMarket(insId, h)
- }
- }
- func (c *ClientSimple) connectSub(saddr string) error {
- clientSub, err := net.Dial("tcp", saddr)
- if err != nil {
- return err
- }
- enc := json.NewEncoder(clientSub)
- dec := json.NewDecoder(clientSub)
- c.clientSub = clientSub
- c.enc = enc
- c.dec = dec
- return nil
- }
- func (c *ClientSimple) GetInsMap() map[string]*Instrument {
- return c.insMap
- }
- func (c *ClientSimple) GetIns(insId string) *Instrument {
- return c.insMap[insId]
- }
- // downloadOne下载一个文件
- // 可以同时调用下载多个文件以增加下载效率
- // 参数done用于指明此函数调用是否完成
- func (c *ClientSimple) downloadOne(insId, date string, period int) (string, error) {
- c.mu.Lock()
- defer c.mu.Unlock()
- var u string
- if period == D1 {
- u = fmt.Sprintf("http://%s/%s/%s/D1.gz", c.httpAddr, InsIdPrefix(insId), insId)
- } else {
- var year, month, day int
- fmt.Sscanf(date, "%04d%02d%02d", &year, &month, &day)
- yearStr := fmt.Sprintf("%04d", year)
- periodStr := PeriodNameMap[period]
- if period == TK {
- periodStr = "tk"
- }
- u = fmt.Sprintf("http://%s/%s/%s/%s/%s.%s.gz", c.httpAddr, InsIdPrefix(insId), insId, yearStr, date, periodStr)
- }
- surl, err := url.Parse(u)
- if err != nil {
- return "", err
- }
- fname := path.Join(c.dir, surl.Path)
- dir := path.Dir(fname)
- os.MkdirAll(dir, 0777)
- if isExist(fname, date, period) {
- return fname, nil
- }
- res, err := http.Get(u)
- if err != nil {
- return "", err
- }
- defer res.Body.Close()
- w, err := os.Create(fname)
- if err != nil {
- return "", err
- }
- defer w.Close()
- _, err = io.Copy(w, res.Body)
- if err != nil {
- return "", err
- }
- return fname, nil
- }
- func isExist(fname, date string, period int) bool {
- if period == D1 {
- return false
- }
- if _, err := os.Stat(fname); os.IsNotExist(err) {
- return false
- }
- if period == TK {
- today := time.Now()
- todayDate := fmt.Sprintf("%04d%02d%02d", today.Year(), today.Month(), today.Day())
- yesterday := today.AddDate(0, 0, -1)
- yesterdayDate := fmt.Sprintf("%04d%02d%02d", yesterday.Year(), yesterday.Month(), yesterday.Day())
- if (date == todayDate) || (date == yesterdayDate) {
- return false
- } else {
- return true
- }
- }
- return true
- }
- func (c *ClientSimple) GetTickHistory(insId string, n int, ts int64) ([]Tick, error) {
- ticks, _, err := c.GetHistory(insId, 0, n, ts)
- return ticks, err
- }
- func (c *ClientSimple) GetCandleHistory(insId string, period, n int, ts int64) ([]Candle, error) {
- _, candles, err := c.GetHistory(insId, period, n, ts)
- return candles, err
- }
- func (c *ClientSimple) GetHis(insId string, period, n int, timestamp int64) (io.ReadCloser, error) {
- if timestamp < 0 {
- timestamp = time.Now().Unix() * 1000
- }
- if n == 0 {
- return nil, errors.New("no data needed.")
- }
- var tBegin int64
- if n < 0 {
- tBegin = 0
- }
- if n > 0 {
- tBegin = timestamp - 3600*24*1000
- }
- t := time.Unix(tBegin/1000, 0)
- periodStr := PeriodNameMap[period]
- if period == TK {
- periodStr = "tk"
- }
- u := fmt.Sprintf("http://%s/timelist?symbol=%s&period=%s&begin=%04d%02d%02d", c.httpComAddr, insId, periodStr, t.Year(), t.Month(), t.Day())
- resp, err := http.Get(u)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- data, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return nil, err
- }
- var timelist []string
- json.Unmarshal(data, &timelist)
- if period == D1 {
- timelist = append(timelist, "xxx")
- }
- var fileIndex int
- if n < 0 {
- fileIndex = len(timelist) - 1
- tPoint := time.Unix(timestamp/1000+3600*24, 0)
- datePoint := fmt.Sprintf("%04d%02d%02d", tPoint.Year(), tPoint.Month(), tPoint.Day())
- for i, v := range timelist {
- if v >= datePoint {
- fileIndex = i
- break
- }
- }
- }
- return &HistoryResponse{
- clientsimple: c,
- timelist: timelist[:],
- insId: insId,
- period: period,
- n: n,
- ts: timestamp,
- fileIndex: fileIndex,
- buf: bytes.NewBuffer([]byte("")),
- }, nil
- }
- func (c *ClientSimple) GetHisEx(insId string, period int, st, et int64) (io.ReadCloser, error) {
- tBegin := st - 3600*24*1000
- t := time.Unix(tBegin/1000, 0)
- periodStr := PeriodNameMap[period]
- if period == TK {
- periodStr = "tk"
- }
- u := fmt.Sprintf("http://%s/timelist?symbol=%s&period=%s&begin=%04d%02d%02d", c.httpComAddr, insId, periodStr, t.Year(), t.Month(), t.Day())
- log.Println("down beg", u)
- resp, err := http.Get(u)
- log.Println("down end", u, err)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- data, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return nil, err
- }
- var timelist []string
- json.Unmarshal(data, &timelist)
- if period == D1 {
- timelist = append(timelist, "xxx")
- }
- return &HistoryResponseEx{
- clientsimple: c,
- timelist: timelist[:],
- insId: insId,
- period: period,
- st: st,
- et: et,
- buf: bytes.NewBuffer([]byte("")),
- }, nil
- }
- //GetHistory函数封装了rpc的调用, 并对结果做相应的处理
- func (c *ClientSimple) GetHistory(insId string, period, nn int, ts int64) ([]Tick, []Candle, error) {
- if period < M1 {
- period = 0
- }
- resp, err := c.GetHis(insId, period, nn, ts)
- if err != nil {
- //log.Println("@@@@@", err)
- return nil, nil, err
- }
- if period == 0 {
- if nn < 0 {
- nn = -nn
- }
- var ticks []Tick
- for {
- var tick Tick
- err = binary.Read(resp, binary.LittleEndian, &tick)
- if err != nil {
- if err != io.EOF {
- log.Println(err)
- }
- break
- }
- ticks = append(ticks, tick)
- }
- if len(ticks) < nn {
- return ticks, nil, ErrNotEnough
- }
- return ticks, nil, nil
- }
- if nn < 0 {
- nn = -nn
- }
- var candles []Candle
- for {
- var candle Candle
- err = binary.Read(resp, binary.LittleEndian, &candle)
- if err != nil {
- if err != io.EOF {
- log.Println(err)
- }
- break
- }
- candles = append(candles, candle)
- }
- if len(candles) < nn {
- return nil, candles, ErrNotEnough
- }
- return nil, candles, nil
- }
- func (c *ClientSimple) GetHistoryEx(insId string, period int, st, et int64) ([]Tick, []Candle, error) {
- if period < M1 {
- period = 0
- }
- resp, err := c.GetHisEx(insId, period, st, et)
- if err != nil {
- log.Println("@@@@@", err)
- return nil, nil, err
- }
- if period == 0 {
- var ticks []Tick
- for {
- var tick Tick
- err = binary.Read(resp, binary.LittleEndian, &tick)
- if err != nil {
- if err != io.EOF {
- log.Println(err)
- }
- break
- }
- if tick.Timestamp < st {
- continue
- }
- if tick.Timestamp > et {
- break
- }
- ticks = append(ticks, tick)
- }
- return ticks, nil, nil
- }
- var candles []Candle
- for {
- var candle Candle
- err = binary.Read(resp, binary.LittleEndian, &candle)
- if err != nil {
- if err != io.EOF {
- log.Println(err)
- }
- break
- }
- if candle.Timestamp < st {
- continue
- }
- if candle.Timestamp > et {
- break
- }
- candles = append(candles, candle)
- }
- return nil, candles, nil
- }
- func (c *ClientSimple) GetLastTicks(insId string, n int) ([]Tick, error) {
- u := fmt.Sprintf("http://%s/cachedata?symbol=%s&period=%s", c.httpComAddr, insId, PeriodNameMap[TK])
- resp, err := http.Get(u)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- data, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return nil, err
- }
- var ticks []Tick
- json.Unmarshal(data, &ticks)
- p := len(ticks) - n
- if p < 0 {
- p = 0
- }
- return ticks[p:], nil
- }
- func (c *ClientSimple) GetLastCandles(insId string, period, n int) ([]Candle, error) {
- u := fmt.Sprintf("http://%s/cachedata?symbol=%s&period=%s", c.httpComAddr, insId, PeriodNameMap[period])
- resp, err := http.Get(u)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- data, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return nil, err
- }
- var candles []Candle
- json.Unmarshal(data, &candles)
- p := len(candles) - n
- if p < 0 {
- p = 0
- }
- return candles[p:], nil
- }
- func (c *ClientSimple) GetLastTicksByTime(insId string, st, et int64) ([]Tick, error) {
- u := fmt.Sprintf("http://%s/cachedata?symbol=%s&period=%s", c.httpComAddr, insId, PeriodNameMap[TK])
- resp, err := http.Get(u)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- data, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return nil, err
- }
- var ticks []Tick
- json.Unmarshal(data, &ticks)
- tLen := len(ticks)
- if tLen <= 0 {
- return nil, nil
- }
- tSt := ticks[0].Timestamp
- tEt := ticks[tLen-1].Timestamp
- if st > tEt || et < tSt {
- return nil, nil
- }
- begin, end := -1, -1
- for i, v := range ticks {
- if v.Timestamp > et {
- break
- }
- if v.Timestamp >= st {
- if begin == -1 {
- begin = i
- } else {
- end = i
- }
- }
- }
- if begin != -1 && end != -1 {
- return ticks[begin:end], nil
- } else {
- return nil, nil
- }
- }
- func (c *ClientSimple) GetLastCandlesByTime(insId string, period int, st, et int64) ([]Candle, error) {
- u := fmt.Sprintf("http://%s/cachedata?symbol=%s&period=%s", c.httpComAddr, insId, PeriodNameMap[period])
- resp, err := http.Get(u)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- data, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return nil, err
- }
- var candles []Candle
- json.Unmarshal(data, &candles)
- cLen := len(candles)
- if cLen <= 0 {
- return nil, nil
- }
- cSt := candles[0].Timestamp
- cEt := candles[cLen-1].Timestamp
- if st > cEt || et < cSt {
- return nil, nil
- }
- begin, end := -1, -1
- for i, v := range candles {
- if v.Timestamp > et {
- break
- }
- if v.Timestamp >= st {
- if begin == -1 {
- begin = i
- } else {
- end = i
- }
- }
- }
- if begin != -1 && end != -1 {
- return candles[begin:end], nil
- } else {
- return nil, nil
- }
- }
- func (c *ClientSimple) doSubTest() {
- t := time.Tick(time.Second * 30)
- for _ = range t {
- c.subMarket("0", 0)
- }
- }
- func (c *ClientSimple) doInput() {
- type MC struct {
- ch chan *Market
- }
- mcCh := make(chan *MC, 1024)
- for i := 0; i < 16; i++ {
- go func() {
- for {
- mc := <-mcCh
- select {
- case mk := <-mc.ch:
- ins, ok := c.insMap[mk.InsId]
- if !ok {
- break
- }
- mk.SetIns(ins)
- ins.SetMk(mk)
- //DebugDelay("doInput", mk.InsId, mk.Timestamp)
- default:
- time.Sleep(time.Microsecond)
- }
- mcCh <- mc
- }
- }()
- }
- mapCh := make(map[string]chan *Market)
- for {
- mk := &Market{}
- err := c.dec.Decode(mk)
- if err != nil {
- c.reConnectSub(err)
- continue
- }
- _, ok := mapCh[mk.InsId]
- if !ok {
- ch := make(chan *Market, 1)
- mcCh <- &MC{ch}
- mapCh[mk.InsId] = ch
- }
- select {
- case mapCh[mk.InsId] <- mk:
- default:
- }
- }
- }
- func (c *ClientSimple) subMarket(insId string, h int64) error {
- args := SubArgs{insId, h, false}
- return c.enc.Encode(args)
- }
- // 订阅指定产品的行情
- func (c *ClientSimple) SubMarket(insId string) {
- _, ok := c.mapSubs[insId]
- if ok {
- return
- }
- log.Println("Client.SubMarket", insId)
- h := time.Now().UnixNano() + rand.Int63()
- c.mapSubs[insId] = h
- c.subMarket(insId, h)
- }
- // 取消订阅
- func (c *ClientSimple) UnsubMarket(insId string) error {
- log.Println("Client.UnsubMarket", insId)
- h, ok := c.mapSubs[insId]
- if !ok {
- return errors.New("UnsubMarket error: insId is NOT in mapSubs " + insId)
- }
- delete(c.mapSubs, insId)
- args := SubArgs{insId, h, true}
- return c.enc.Encode(args)
- }
- func (c *ClientSimple) Close() {
- c.clientSub.Close()
- }
|