server.go 6.0 KB


  1. // Copyright 2013-2014 Fuzamei tech Ltd. All rights reserved.
  2. package market
  3. // 本文件实现Server的定义: rpc服务接口定义
  4. import (
  5. "compress/gzip"
  6. "encoding/json"
  7. "io"
  8. "log"
  9. "net"
  10. "net/http"
  11. "net/rpc"
  12. "net/rpc/jsonrpc"
  13. "os"
  14. "os/signal"
  15. "strings"
  16. "syscall"
  17. )
  18. // 基本周期, 服务端只保存基本周期的数据, 而其他周期的需要客户端自己转换
  19. var basePeriodSet = map[int]struct{}{
  20. M1: {},
  21. M5: {},
  22. H1: {},
  23. D1: {},
  24. }
  25. // 实现rpc 服务接口
  26. type Server struct {
  27. ex *FzmEx // 交易所
  28. mhMap map[int64]int64 // 客户端订阅行情的handle, 用于客户端取消订阅
  29. dir string // http文件服务器路径
  30. }
  31. func NewServer(db *MyDB, dataPath string, inssFname string) *Server {
  32. return &Server{
  33. ex: NewFzmEx(db, inssFname),
  34. mhMap: make(map[int64]int64),
  35. dir: dataPath,
  36. }
  37. }
  38. func (s *Server) AddDS(name string, ds DataSource) {
  39. s.ex.AddDS(name, ds)
  40. }
  41. func (s *Server) SignalHandle() {
  42. c := make(chan os.Signal, 1)
  43. signal.Notify(c, syscall.SIGTERM)
  44. // Block until a signal is received.
  45. sig := <-c
  46. log.Println("Got signal:", sig)
  47. s.ex.SaveAllTicks()
  48. //log.Println("SaveAllTicks end")
  49. os.Exit(0)
  50. }
  51. func (s *Server) rpcRun(addr string) error {
  52. rpc.Register(s)
  53. l, err := net.Listen("tcp", addr)
  54. if err != nil {
  55. return err
  56. }
  57. for {
  58. conn, err := l.Accept()
  59. if err != nil {
  60. log.Fatal("Server rpc listen: accept:", err)
  61. }
  62. go jsonrpc.ServeConn(conn)
  63. }
  64. return nil
  65. }
  66. func (s *Server) httpRun(addr string) {
  67. http.HandleFunc("/cachedata", makeGzipHandler(s.cacheHandler))
  68. http.HandleFunc("/timelist", makeGzipHandler(s.timelistHandler))
  69. http.HandleFunc("/instruments", makeGzipHandler(s.instrumentsHandler))
  70. err := http.ListenAndServe(addr, nil)
  71. if err != nil {
  72. log.Fatal("ListenAndServe: ", err.Error())
  73. }
  74. }
  75. func (s *Server) marketServe(addr string) error {
  76. l, err := net.Listen("tcp", addr)
  77. if err != nil {
  78. return err
  79. }
  80. for {
  81. conn, err := l.Accept()
  82. if err != nil {
  83. log.Fatal("Server rpc listen: accept:", err)
  84. }
  85. go s.serveConn0(conn)
  86. }
  87. return nil
  88. }
  89. func (s *Server) serveConn0(conn net.Conn) {
  90. dec := json.NewDecoder(conn)
  91. enc := json.NewEncoder(conn)
  92. // conn.SetWriteDeadline(time.Time{})
  93. for {
  94. subArgs := &SubArgs{}
  95. err := dec.Decode(subArgs)
  96. if err != nil {
  97. //log.Println("Server.serveConn0 error:", err)
  98. conn.Close()
  99. return
  100. }
  101. if !subArgs.IsCancel {
  102. ev := s.ex.SubMarket(subArgs.InsId)
  103. if ev == nil {
  104. continue
  105. }
  106. ech := make(chan error, 1)
  107. vch := make(chan *Market, 1)
  108. go func() {
  109. for {
  110. mk := <-vch
  111. err := enc.Encode(mk)
  112. if err != nil {
  113. ech <- err
  114. // close connetion and force client reconnect
  115. conn.Close()
  116. return
  117. }
  118. //if mk.InsId == "tdx_000001" {
  119. //log.Println("[serveConn0 write]data trace")
  120. //}
  121. }
  122. }()
  123. h := ev.Attach(func(v interface{}) error {
  124. mk := v.(*Market)
  125. //if mk.InsId == "tdx_000001" {
  126. //log.Println("[serveConn0]data trace")
  127. //}
  128. select {
  129. case vch <- mk:
  130. case err := <-ech:
  131. if err != nil {
  132. //log.Println("enc.Encode(mk) error:", err, mk.InsId)
  133. return err
  134. }
  135. default:
  136. // <-vch
  137. }
  138. return nil
  139. })
  140. s.mhMap[subArgs.Code] = h
  141. } else {
  142. ev := s.ex.SubMarket(subArgs.InsId)
  143. if ev != nil {
  144. ev.Detach(s.mhMap[subArgs.Code])
  145. }
  146. }
  147. }
  148. }
  149. // ListenAndServe启动rpc服务和http文件下载服务
  150. func (s *Server) ListenAndServe(addr1, addr2 string) error {
  151. //go s.rpcRun(addr1)
  152. go s.httpRun(addr1)
  153. //go s.marketServe(addr2)
  154. //log.Println("the Server Listening on:", addr1, addr2)
  155. return s.marketServe(addr2) //http.ListenAndServe(":9090", http.FileServer(http.Dir(s.dir)))
  156. }
  157. // 指明最新时间
  158. var TimeNow = int64(-1)
  159. func (s *Server) getEx(insId string) (*FzmEx, error) {
  160. return s.ex, nil
  161. }
  162. func (s *Server) Heartbeat(id int64, status *int64) error {
  163. *status = id
  164. return nil
  165. }
  166. func (s *Server) cacheHandler(w http.ResponseWriter, r *http.Request) {
  167. symbol := r.FormValue("symbol")
  168. periodStr := r.FormValue("period")
  169. //log.Println(symbol, periodStr)
  170. period, ok := PeriodIdMap[periodStr]
  171. if !ok {
  172. io.WriteString(w, "period not supported")
  173. return
  174. }
  175. ex, err := s.getEx(symbol)
  176. if period != 0 {
  177. candles, err := ex.GetCacheCandles(symbol, period)
  178. if err != nil {
  179. //log.Println(err)
  180. }
  181. b, _ := json.Marshal(candles)
  182. io.WriteString(w, string(b))
  183. return
  184. }
  185. ticks, err := ex.GetCacheTicks(symbol)
  186. if err != nil {
  187. //log.Println(err)
  188. }
  189. b, _ := json.Marshal(ticks)
  190. io.WriteString(w, string(b))
  191. }
  192. func (s *Server) timelistHandler(w http.ResponseWriter, r *http.Request) {
  193. symbol := r.FormValue("symbol")
  194. period := r.FormValue("period")
  195. beginStr := r.FormValue("begin")
  196. //log.Println(symbol, period, beginStr)
  197. ex, err := s.getEx(symbol)
  198. timelist, err := ex.GetTimeList(symbol, period, beginStr)
  199. if err != nil {
  200. //log.Println(err)
  201. }
  202. //log.Println("timelist", timelist)
  203. b, _ := json.Marshal(timelist)
  204. io.WriteString(w, string(b))
  205. }
  206. func (s *Server) instrumentsHandler(w http.ResponseWriter, r *http.Request) {
  207. insMap := s.ex.Instruments()
  208. b, _ := json.Marshal(insMap)
  209. io.WriteString(w, string(b))
  210. }
  211. type gzipResponseWriter struct {
  212. io.Writer
  213. http.ResponseWriter
  214. }
  215. func (w gzipResponseWriter) Write(b []byte) (int, error) {
  216. return w.Writer.Write(b)
  217. }
  218. func (w gzipResponseWriter) Flush() {
  219. w.Writer.(*gzip.Writer).Flush()
  220. w.ResponseWriter.(http.Flusher).Flush()
  221. }
  222. func makeGzipHandler(fn http.HandlerFunc) http.HandlerFunc {
  223. return func(w http.ResponseWriter, r *http.Request) {
  224. if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
  225. fn(w, r)
  226. return
  227. }
  228. w.Header().Set("Content-Encoding", "gzip")
  229. w.Header().Set("Content-Type", "text/javascript")
  230. gz := gzip.NewWriter(w)
  231. defer gz.Close()
  232. fn(gzipResponseWriter{Writer: gz, ResponseWriter: w}, r)
  233. }
  234. }