// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package client // 本文件为简化客户端的实现, 对Server服务调用进行封装 import ( "bytes" "encoding/binary" "encoding/json" "errors" "fmt" "io" "io/ioutil" "log" "math/rand" "net" "net/http" "net/url" "os" "path" "sync" "time" ) // Client对Server的rpc调用进行封装 // 并保存下载的历史数据, 避免重复下载 // 使用数据库管理下载的数据 type ClientSimple struct { clientSub net.Conn // 由于服务器行情数据较多, 为避免和其他数据冲突, 采用新的连接单独处理行情数据 enc encoder dec decoder insMap map[string]*Instrument // 产品列表 httpComAddr, subAddr, httpAddr string // 服务器地址 mapSubs map[string]int64 dir string mu sync.Mutex } var instance *ClientSimple var mu sync.Mutex func GetClientInstance(rpcAddr, rtRpcAddr, subAddr, httpAddr, dir string) (*ClientSimple, error) { mu.Lock() defer mu.Unlock() if instance != nil { return instance, nil } var err error instance, err = NewClientSimple(rpcAddr, rtRpcAddr, subAddr, httpAddr, dir) if err != nil { instance = nil } return instance, err } //简化下载流程,去掉数据库,直接判断文件是否存在来查看本地是否有cache func NewClientSimple(rpcAddr, rtRpcAddr, subAddr, httpAddr, dir string) (*ClientSimple, error) { rand.Seed(time.Now().UnixNano()) c := &ClientSimple{ insMap: make(map[string]*Instrument), httpComAddr: rpcAddr, subAddr: subAddr, httpAddr: httpAddr, dir: dir, mapSubs: make(map[string]int64), } err := c.connectSub(subAddr) if err != nil { return nil, err } err = c.getInsMap() if err != nil { return nil, err } go c.doInput() go c.doSubTest() return c, nil } func (c *ClientSimple) GetInsName(insId string) string { return c.insMap[insId].Name } func (c *ClientSimple) getInsMap() error { u := fmt.Sprintf("http://%s/instruments", c.httpComAddr) log.Println("beg", u) resp, err := http.Get(u) log.Println("end", u) if err != nil { return err } defer resp.Body.Close() data, err := ioutil.ReadAll(resp.Body) if err != nil { return err } json.Unmarshal(data, &c.insMap) return nil } func (c *ClientSimple) reConnectSub(err error) { c.clientSub.Close() for err != nil { log.Println("client.reConnectSub error:", err) err = c.connectSub(c.subAddr) time.Sleep(time.Second * 10) } for insId, h := range c.mapSubs { c.subMarket(insId, h) } } func (c *ClientSimple) connectSub(saddr string) error { clientSub, err := net.Dial("tcp", saddr) if err != nil { return err } enc := json.NewEncoder(clientSub) dec := json.NewDecoder(clientSub) c.clientSub = clientSub c.enc = enc c.dec = dec return nil } func (c *ClientSimple) GetInsMap() map[string]*Instrument { return c.insMap } func (c *ClientSimple) GetIns(insId string) *Instrument { return c.insMap[insId] } // downloadOne下载一个文件 // 可以同时调用下载多个文件以增加下载效率 // 参数done用于指明此函数调用是否完成 func (c *ClientSimple) downloadOne(insId, date string, period int) (string, error) { c.mu.Lock() defer c.mu.Unlock() var u string if period == D1 { u = fmt.Sprintf("http://%s/%s/%s/D1.gz", c.httpAddr, InsIdPrefix(insId), insId) } else { var year, month, day int fmt.Sscanf(date, "%04d%02d%02d", &year, &month, &day) yearStr := fmt.Sprintf("%04d", year) periodStr := PeriodNameMap[period] if period == TK { periodStr = "tk" } u = fmt.Sprintf("http://%s/%s/%s/%s/%s.%s.gz", c.httpAddr, InsIdPrefix(insId), insId, yearStr, date, periodStr) } surl, err := url.Parse(u) if err != nil { return "", err } fname := path.Join(c.dir, surl.Path) dir := path.Dir(fname) os.MkdirAll(dir, 0777) if isExist(fname, date, period) { return fname, nil } res, err := http.Get(u) if err != nil { return "", err } defer res.Body.Close() w, err := os.Create(fname) if err != nil { return "", err } defer w.Close() _, err = io.Copy(w, res.Body) if err != nil { return "", err } return fname, nil } func isExist(fname, date string, period int) bool { if period == D1 { return false } if _, err := os.Stat(fname); os.IsNotExist(err) { return false } if period == TK { today := time.Now() todayDate := fmt.Sprintf("%04d%02d%02d", today.Year(), today.Month(), today.Day()) yesterday := today.AddDate(0, 0, -1) yesterdayDate := fmt.Sprintf("%04d%02d%02d", yesterday.Year(), yesterday.Month(), yesterday.Day()) if (date == todayDate) || (date == yesterdayDate) { return false } else { return true } } return true } func (c *ClientSimple) GetTickHistory(insId string, n int, ts int64) ([]Tick, error) { ticks, _, err := c.GetHistory(insId, 0, n, ts) return ticks, err } func (c *ClientSimple) GetCandleHistory(insId string, period, n int, ts int64) ([]Candle, error) { _, candles, err := c.GetHistory(insId, period, n, ts) return candles, err } func (c *ClientSimple) GetHis(insId string, period, n int, timestamp int64) (io.ReadCloser, error) { if timestamp < 0 { timestamp = time.Now().Unix() * 1000 } if n == 0 { return nil, errors.New("no data needed.") } var tBegin int64 if n < 0 { tBegin = 0 } if n > 0 { tBegin = timestamp - 3600*24*1000 } t := time.Unix(tBegin/1000, 0) periodStr := PeriodNameMap[period] if period == TK { periodStr = "tk" } u := fmt.Sprintf("http://%s/timelist?symbol=%s&period=%s&begin=%04d%02d%02d", c.httpComAddr, insId, periodStr, t.Year(), t.Month(), t.Day()) resp, err := http.Get(u) if err != nil { return nil, err } defer resp.Body.Close() data, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err } var timelist []string json.Unmarshal(data, &timelist) if period == D1 { timelist = append(timelist, "xxx") } var fileIndex int if n < 0 { fileIndex = len(timelist) - 1 tPoint := time.Unix(timestamp/1000+3600*24, 0) datePoint := fmt.Sprintf("%04d%02d%02d", tPoint.Year(), tPoint.Month(), tPoint.Day()) for i, v := range timelist { if v >= datePoint { fileIndex = i break } } } return &HistoryResponse{ clientsimple: c, timelist: timelist[:], insId: insId, period: period, n: n, ts: timestamp, fileIndex: fileIndex, buf: bytes.NewBuffer([]byte("")), }, nil } func (c *ClientSimple) GetHisEx(insId string, period int, st, et int64) (io.ReadCloser, error) { tBegin := st - 3600*24*1000 t := time.Unix(tBegin/1000, 0) periodStr := PeriodNameMap[period] if period == TK { periodStr = "tk" } u := fmt.Sprintf("http://%s/timelist?symbol=%s&period=%s&begin=%04d%02d%02d", c.httpComAddr, insId, periodStr, t.Year(), t.Month(), t.Day()) log.Println("down beg", u) resp, err := http.Get(u) log.Println("down end", u, err) if err != nil { return nil, err } defer resp.Body.Close() data, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err } var timelist []string json.Unmarshal(data, &timelist) if period == D1 { timelist = append(timelist, "xxx") } return &HistoryResponseEx{ clientsimple: c, timelist: timelist[:], insId: insId, period: period, st: st, et: et, buf: bytes.NewBuffer([]byte("")), }, nil } //GetHistory函数封装了rpc的调用, 并对结果做相应的处理 func (c *ClientSimple) GetHistory(insId string, period, nn int, ts int64) ([]Tick, []Candle, error) { if period < M1 { period = 0 } resp, err := c.GetHis(insId, period, nn, ts) if err != nil { //log.Println("@@@@@", err) return nil, nil, err } if period == 0 { if nn < 0 { nn = -nn } var ticks []Tick for { var tick Tick err = binary.Read(resp, binary.LittleEndian, &tick) if err != nil { if err != io.EOF { log.Println(err) } break } ticks = append(ticks, tick) } if len(ticks) < nn { return ticks, nil, ErrNotEnough } return ticks, nil, nil } if nn < 0 { nn = -nn } var candles []Candle for { var candle Candle err = binary.Read(resp, binary.LittleEndian, &candle) if err != nil { if err != io.EOF { log.Println(err) } break } candles = append(candles, candle) } if len(candles) < nn { return nil, candles, ErrNotEnough } return nil, candles, nil } func (c *ClientSimple) GetHistoryEx(insId string, period int, st, et int64) ([]Tick, []Candle, error) { if period < M1 { period = 0 } resp, err := c.GetHisEx(insId, period, st, et) if err != nil { log.Println("@@@@@", err) return nil, nil, err } if period == 0 { var ticks []Tick for { var tick Tick err = binary.Read(resp, binary.LittleEndian, &tick) if err != nil { if err != io.EOF { log.Println(err) } break } if tick.Timestamp < st { continue } if tick.Timestamp > et { break } ticks = append(ticks, tick) } return ticks, nil, nil } var candles []Candle for { var candle Candle err = binary.Read(resp, binary.LittleEndian, &candle) if err != nil { if err != io.EOF { log.Println(err) } break } if candle.Timestamp < st { continue } if candle.Timestamp > et { break } candles = append(candles, candle) } return nil, candles, nil } func (c *ClientSimple) GetLastTicks(insId string, n int) ([]Tick, error) { u := fmt.Sprintf("http://%s/cachedata?symbol=%s&period=%s", c.httpComAddr, insId, PeriodNameMap[TK]) resp, err := http.Get(u) if err != nil { return nil, err } defer resp.Body.Close() data, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err } var ticks []Tick json.Unmarshal(data, &ticks) p := len(ticks) - n if p < 0 { p = 0 } return ticks[p:], nil } func (c *ClientSimple) GetLastCandles(insId string, period, n int) ([]Candle, error) { u := fmt.Sprintf("http://%s/cachedata?symbol=%s&period=%s", c.httpComAddr, insId, PeriodNameMap[period]) resp, err := http.Get(u) if err != nil { return nil, err } defer resp.Body.Close() data, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err } var candles []Candle json.Unmarshal(data, &candles) p := len(candles) - n if p < 0 { p = 0 } return candles[p:], nil } func (c *ClientSimple) GetLastTicksByTime(insId string, st, et int64) ([]Tick, error) { u := fmt.Sprintf("http://%s/cachedata?symbol=%s&period=%s", c.httpComAddr, insId, PeriodNameMap[TK]) resp, err := http.Get(u) if err != nil { return nil, err } defer resp.Body.Close() data, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err } var ticks []Tick json.Unmarshal(data, &ticks) tLen := len(ticks) if tLen <= 0 { return nil, nil } tSt := ticks[0].Timestamp tEt := ticks[tLen-1].Timestamp if st > tEt || et < tSt { return nil, nil } begin, end := -1, -1 for i, v := range ticks { if v.Timestamp > et { break } if v.Timestamp >= st { if begin == -1 { begin = i } else { end = i } } } if begin != -1 && end != -1 { return ticks[begin:end], nil } else { return nil, nil } } func (c *ClientSimple) GetLastCandlesByTime(insId string, period int, st, et int64) ([]Candle, error) { u := fmt.Sprintf("http://%s/cachedata?symbol=%s&period=%s", c.httpComAddr, insId, PeriodNameMap[period]) resp, err := http.Get(u) if err != nil { return nil, err } defer resp.Body.Close() data, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err } var candles []Candle json.Unmarshal(data, &candles) cLen := len(candles) if cLen <= 0 { return nil, nil } cSt := candles[0].Timestamp cEt := candles[cLen-1].Timestamp if st > cEt || et < cSt { return nil, nil } begin, end := -1, -1 for i, v := range candles { if v.Timestamp > et { break } if v.Timestamp >= st { if begin == -1 { begin = i } else { end = i } } } if begin != -1 && end != -1 { return candles[begin:end], nil } else { return nil, nil } } func (c *ClientSimple) doSubTest() { t := time.Tick(time.Second * 30) for _ = range t { c.subMarket("0", 0) } } func (c *ClientSimple) doInput() { type MC struct { ch chan *Market } mcCh := make(chan *MC, 1024) for i := 0; i < 16; i++ { go func() { for { mc := <-mcCh select { case mk := <-mc.ch: ins, ok := c.insMap[mk.InsId] if !ok { break } mk.SetIns(ins) ins.SetMk(mk) //DebugDelay("doInput", mk.InsId, mk.Timestamp) default: time.Sleep(time.Microsecond) } mcCh <- mc } }() } mapCh := make(map[string]chan *Market) for { mk := &Market{} err := c.dec.Decode(mk) if err != nil { c.reConnectSub(err) continue } _, ok := mapCh[mk.InsId] if !ok { ch := make(chan *Market, 1) mcCh <- &MC{ch} mapCh[mk.InsId] = ch } select { case mapCh[mk.InsId] <- mk: default: } } } func (c *ClientSimple) subMarket(insId string, h int64) error { args := SubArgs{insId, h, false} return c.enc.Encode(args) } // 订阅指定产品的行情 func (c *ClientSimple) SubMarket(insId string) { _, ok := c.mapSubs[insId] if ok { return } log.Println("Client.SubMarket", insId) h := time.Now().UnixNano() + rand.Int63() c.mapSubs[insId] = h c.subMarket(insId, h) } // 取消订阅 func (c *ClientSimple) UnsubMarket(insId string) error { log.Println("Client.UnsubMarket", insId) h, ok := c.mapSubs[insId] if !ok { return errors.New("UnsubMarket error: insId is NOT in mapSubs " + insId) } delete(c.mapSubs, insId) args := SubArgs{insId, h, true} return c.enc.Encode(args) } func (c *ClientSimple) Close() { c.clientSub.Close() }