// Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved. package market // 本文件实现Server的定义: rpc服务接口定义 import ( "compress/gzip" "encoding/json" "io" "log" "net" "net/http" "net/rpc" "net/rpc/jsonrpc" "os" "os/signal" "strings" "syscall" ) // 基本周期, 服务端只保存基本周期的数据, 而其他周期的需要客户端自己转换 var basePeriodSet = map[int]struct{}{ M1: {}, M5: {}, H1: {}, D1: {}, } // 实现rpc 服务接口 type Server struct { ex *FzmEx // 交易所 mhMap map[int64]int64 // 客户端订阅行情的handle, 用于客户端取消订阅 dir string // http文件服务器路径 } func NewServer(db *MyDB, dataPath string, inssFname string) *Server { return &Server{ ex: NewFzmEx(db, inssFname), mhMap: make(map[int64]int64), dir: dataPath, } } func (s *Server) AddDS(name string, ds DataSource) { s.ex.AddDS(name, ds) } func (s *Server) SignalHandle() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGTERM) // Block until a signal is received. sig := <-c log.Println("Got signal:", sig) s.ex.SaveAllTicks() //log.Println("SaveAllTicks end") os.Exit(0) } func (s *Server) rpcRun(addr string) error { rpc.Register(s) l, err := net.Listen("tcp", addr) if err != nil { return err } for { conn, err := l.Accept() if err != nil { log.Fatal("Server rpc listen: accept:", err) } go jsonrpc.ServeConn(conn) } return nil } func (s *Server) httpRun(addr string) { http.HandleFunc("/cachedata", makeGzipHandler(s.cacheHandler)) http.HandleFunc("/timelist", makeGzipHandler(s.timelistHandler)) http.HandleFunc("/instruments", makeGzipHandler(s.instrumentsHandler)) err := http.ListenAndServe(addr, nil) if err != nil { log.Fatal("ListenAndServe: ", err.Error()) } } func (s *Server) marketServe(addr string) error { l, err := net.Listen("tcp", addr) if err != nil { return err } for { conn, err := l.Accept() if err != nil { log.Fatal("Server rpc listen: accept:", err) } go s.serveConn0(conn) } return nil } func (s *Server) serveConn0(conn net.Conn) { dec := json.NewDecoder(conn) enc := json.NewEncoder(conn) // conn.SetWriteDeadline(time.Time{}) for { subArgs := &SubArgs{} err := dec.Decode(subArgs) if err != nil { //log.Println("Server.serveConn0 error:", err) conn.Close() return } if !subArgs.IsCancel { ev := s.ex.SubMarket(subArgs.InsId) if ev == nil { continue } ech := make(chan error, 1) vch := make(chan *Market, 1) go func() { for { mk := <-vch err := enc.Encode(mk) if err != nil { ech <- err // close connetion and force client reconnect conn.Close() return } //if mk.InsId == "tdx_000001" { //log.Println("[serveConn0 write]data trace") //} } }() h := ev.Attach(func(v interface{}) error { mk := v.(*Market) //if mk.InsId == "tdx_000001" { //log.Println("[serveConn0]data trace") //} select { case vch <- mk: case err := <-ech: if err != nil { //log.Println("enc.Encode(mk) error:", err, mk.InsId) return err } default: // <-vch } return nil }) s.mhMap[subArgs.Code] = h } else { ev := s.ex.SubMarket(subArgs.InsId) if ev != nil { ev.Detach(s.mhMap[subArgs.Code]) } } } } // ListenAndServe启动rpc服务和http文件下载服务 func (s *Server) ListenAndServe(addr1, addr2 string) error { //go s.rpcRun(addr1) go s.httpRun(addr1) //go s.marketServe(addr2) //log.Println("the Server Listening on:", addr1, addr2) return s.marketServe(addr2) //http.ListenAndServe(":9090", http.FileServer(http.Dir(s.dir))) } // 指明最新时间 var TimeNow = int64(-1) func (s *Server) getEx(insId string) (*FzmEx, error) { return s.ex, nil } func (s *Server) Heartbeat(id int64, status *int64) error { *status = id return nil } func (s *Server) cacheHandler(w http.ResponseWriter, r *http.Request) { symbol := r.FormValue("symbol") periodStr := r.FormValue("period") //log.Println(symbol, periodStr) period, ok := PeriodIdMap[periodStr] if !ok { io.WriteString(w, "period not supported") return } ex, err := s.getEx(symbol) if period != 0 { candles, err := ex.GetCacheCandles(symbol, period) if err != nil { //log.Println(err) } b, _ := json.Marshal(candles) io.WriteString(w, string(b)) return } ticks, err := ex.GetCacheTicks(symbol) if err != nil { //log.Println(err) } b, _ := json.Marshal(ticks) io.WriteString(w, string(b)) } func (s *Server) timelistHandler(w http.ResponseWriter, r *http.Request) { symbol := r.FormValue("symbol") period := r.FormValue("period") beginStr := r.FormValue("begin") //log.Println(symbol, period, beginStr) ex, err := s.getEx(symbol) timelist, err := ex.GetTimeList(symbol, period, beginStr) if err != nil { //log.Println(err) } //log.Println("timelist", timelist) b, _ := json.Marshal(timelist) io.WriteString(w, string(b)) } func (s *Server) instrumentsHandler(w http.ResponseWriter, r *http.Request) { insMap := s.ex.Instruments() b, _ := json.Marshal(insMap) io.WriteString(w, string(b)) } type gzipResponseWriter struct { io.Writer http.ResponseWriter } func (w gzipResponseWriter) Write(b []byte) (int, error) { return w.Writer.Write(b) } func (w gzipResponseWriter) Flush() { w.Writer.(*gzip.Writer).Flush() w.ResponseWriter.(http.Flusher).Flush() } func makeGzipHandler(fn http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { fn(w, r) return } w.Header().Set("Content-Encoding", "gzip") w.Header().Set("Content-Type", "text/javascript") gz := gzip.NewWriter(w) defer gz.Close() fn(gzipResponseWriter{Writer: gz, ResponseWriter: w}, r) } }