123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
- package market
- // 本文件实现Server的定义: rpc服务接口定义
- import (
- "compress/gzip"
- "encoding/json"
- "io"
- "log"
- "net"
- "net/http"
- "net/rpc"
- "net/rpc/jsonrpc"
- "os"
- "os/signal"
- "strings"
- "syscall"
- )
- // 基本周期, 服务端只保存基本周期的数据, 而其他周期的需要客户端自己转换
- var basePeriodSet = map[int]struct{}{
- M1: {},
- M5: {},
- H1: {},
- D1: {},
- }
- // 实现rpc 服务接口
- type Server struct {
- ex *FzmEx // 交易所
- mhMap map[int64]int64 // 客户端订阅行情的handle, 用于客户端取消订阅
- dir string // http文件服务器路径
- }
- func NewServer(db *MyDB, dataPath string, inssFname string) *Server {
- return &Server{
- ex: NewFzmEx(db, inssFname),
- mhMap: make(map[int64]int64),
- dir: dataPath,
- }
- }
- func (s *Server) AddDS(name string, ds DataSource) {
- s.ex.AddDS(name, ds)
- }
- func (s *Server) SignalHandle() {
- c := make(chan os.Signal, 1)
- signal.Notify(c, syscall.SIGTERM)
- // Block until a signal is received.
- sig := <-c
- log.Println("Got signal:", sig)
- s.ex.SaveAllTicks()
- //log.Println("SaveAllTicks end")
- os.Exit(0)
- }
- func (s *Server) rpcRun(addr string) error {
- rpc.Register(s)
- l, err := net.Listen("tcp", addr)
- if err != nil {
- return err
- }
- for {
- conn, err := l.Accept()
- if err != nil {
- log.Fatal("Server rpc listen: accept:", err)
- }
- go jsonrpc.ServeConn(conn)
- }
- return nil
- }
- func (s *Server) httpRun(addr string) {
- http.HandleFunc("/cachedata", makeGzipHandler(s.cacheHandler))
- http.HandleFunc("/timelist", makeGzipHandler(s.timelistHandler))
- http.HandleFunc("/instruments", makeGzipHandler(s.instrumentsHandler))
- err := http.ListenAndServe(addr, nil)
- if err != nil {
- log.Fatal("ListenAndServe: ", err.Error())
- }
- }
- func (s *Server) marketServe(addr string) error {
- l, err := net.Listen("tcp", addr)
- if err != nil {
- return err
- }
- for {
- conn, err := l.Accept()
- if err != nil {
- log.Fatal("Server rpc listen: accept:", err)
- }
- go s.serveConn0(conn)
- }
- return nil
- }
- func (s *Server) serveConn0(conn net.Conn) {
- dec := json.NewDecoder(conn)
- enc := json.NewEncoder(conn)
- // conn.SetWriteDeadline(time.Time{})
- for {
- subArgs := &SubArgs{}
- err := dec.Decode(subArgs)
- if err != nil {
- //log.Println("Server.serveConn0 error:", err)
- conn.Close()
- return
- }
- if !subArgs.IsCancel {
- ev := s.ex.SubMarket(subArgs.InsId)
- if ev == nil {
- continue
- }
- ech := make(chan error, 1)
- vch := make(chan *Market, 1)
- go func() {
- for {
- mk := <-vch
- err := enc.Encode(mk)
- if err != nil {
- ech <- err
- // close connetion and force client reconnect
- conn.Close()
- return
- }
- //if mk.InsId == "tdx_000001" {
- //log.Println("[serveConn0 write]data trace")
- //}
- }
- }()
- h := ev.Attach(func(v interface{}) error {
- mk := v.(*Market)
- //if mk.InsId == "tdx_000001" {
- //log.Println("[serveConn0]data trace")
- //}
- select {
- case vch <- mk:
- case err := <-ech:
- if err != nil {
- //log.Println("enc.Encode(mk) error:", err, mk.InsId)
- return err
- }
- default:
- // <-vch
- }
- return nil
- })
- s.mhMap[subArgs.Code] = h
- } else {
- ev := s.ex.SubMarket(subArgs.InsId)
- if ev != nil {
- ev.Detach(s.mhMap[subArgs.Code])
- }
- }
- }
- }
- // ListenAndServe启动rpc服务和http文件下载服务
- func (s *Server) ListenAndServe(addr1, addr2 string) error {
- //go s.rpcRun(addr1)
- go s.httpRun(addr1)
- //go s.marketServe(addr2)
- //log.Println("the Server Listening on:", addr1, addr2)
- return s.marketServe(addr2) //http.ListenAndServe(":9090", http.FileServer(http.Dir(s.dir)))
- }
- // 指明最新时间
- var TimeNow = int64(-1)
- func (s *Server) getEx(insId string) (*FzmEx, error) {
- return s.ex, nil
- }
- func (s *Server) Heartbeat(id int64, status *int64) error {
- *status = id
- return nil
- }
- func (s *Server) cacheHandler(w http.ResponseWriter, r *http.Request) {
- symbol := r.FormValue("symbol")
- periodStr := r.FormValue("period")
- //log.Println(symbol, periodStr)
- period, ok := PeriodIdMap[periodStr]
- if !ok {
- io.WriteString(w, "period not supported")
- return
- }
- ex, err := s.getEx(symbol)
- if period != 0 {
- candles, err := ex.GetCacheCandles(symbol, period)
- if err != nil {
- //log.Println(err)
- }
- b, _ := json.Marshal(candles)
- io.WriteString(w, string(b))
- return
- }
- ticks, err := ex.GetCacheTicks(symbol)
- if err != nil {
- //log.Println(err)
- }
- b, _ := json.Marshal(ticks)
- io.WriteString(w, string(b))
- }
- func (s *Server) timelistHandler(w http.ResponseWriter, r *http.Request) {
- symbol := r.FormValue("symbol")
- period := r.FormValue("period")
- beginStr := r.FormValue("begin")
- //log.Println(symbol, period, beginStr)
- ex, err := s.getEx(symbol)
- timelist, err := ex.GetTimeList(symbol, period, beginStr)
- if err != nil {
- //log.Println(err)
- }
- //log.Println("timelist", timelist)
- b, _ := json.Marshal(timelist)
- io.WriteString(w, string(b))
- }
- func (s *Server) instrumentsHandler(w http.ResponseWriter, r *http.Request) {
- insMap := s.ex.Instruments()
- b, _ := json.Marshal(insMap)
- io.WriteString(w, string(b))
- }
- type gzipResponseWriter struct {
- io.Writer
- http.ResponseWriter
- }
- func (w gzipResponseWriter) Write(b []byte) (int, error) {
- return w.Writer.Write(b)
- }
- func (w gzipResponseWriter) Flush() {
- w.Writer.(*gzip.Writer).Flush()
- w.ResponseWriter.(http.Flusher).Flush()
- }
- func makeGzipHandler(fn http.HandlerFunc) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
- fn(w, r)
- return
- }
- w.Header().Set("Content-Encoding", "gzip")
- w.Header().Set("Content-Type", "text/javascript")
- gz := gzip.NewWriter(w)
- defer gz.Close()
- fn(gzipResponseWriter{Writer: gz, ResponseWriter: w}, r)
- }
- }
|