package main import ( "compress/gzip" "encoding/json" "io" "tickserver/api/lmaxapi" "tickserver/api/lmaxapi/main/common" "tickserver/api/lmaxapi/request" "tickserver/api/lmaxapi/response" "log" "net/http" "strings" "sync" "time" ) var instrumentMap map[int64]*response.Instrument var imu sync.Mutex var priceMap = make(map[int64]*response.OrderBookEvent) var pmu sync.Mutex func setPrice(event *response.OrderBookEvent) { pmu.Lock() defer pmu.Unlock() priceMap[event.InstrumentId] = event } func getPrice() (events []*response.OrderBookEvent) { pmu.Lock() defer pmu.Unlock() for _, item := range priceMap { events = append(events, item) } return } func setInst(id int64, inst *response.Instrument) { imu.Lock() defer imu.Unlock() if id == 0 && inst == nil { instrumentMap = nil } if instrumentMap == nil { instrumentMap = make(map[int64]*response.Instrument) } instrumentMap[id] = inst } func getInst(id int64) *response.Instrument { imu.Lock() defer imu.Unlock() data, ok := instrumentMap[id] if ok { return data } return nil } func main() { ch := make(chan *response.OrderBookEvent, 100) go func() { for { event := <-ch setPrice(event) log.Println("dt = ", time.Now().UnixNano() / 1e6 - event.Timestamp, time.Unix(event.Timestamp / 1000, (event.Timestamp % 1000) * 1e6), event.InstrumentId) } }() go httpserver() for { marketdata(ch) //session stop的情况下,休息一会 time.Sleep(1 * time.Second) } } func marketdata(ch chan *response.OrderBookEvent) { log.Println("MarketDataClient2") logw, err := common.EnableLog("proto.log") if err != nil { log.Println("EnableLog Failed:", err) return } defer logw.Close() session, err := common.CreateSession() if err != nil { log.Println("Login Failed:", err) time.Sleep(10 * time.Second) return } failed := 0 session.RegisterStreamFailureEvent(func(s *lmaxapi.Session, err error) { //1. 处理下面的事件 //1.1 网络中断:如果是交易,可能需要重新登录,以获取最新订单状态,如果是行情,可能不需要重启 //1.2 session 过期,发生403错误,必须重新登录 //1.3 heartbeat 心跳无响应: Op=stream err=heart beart timeout, Code=-1 // 这个时候调用 session.StreamClose() 重新启动stream operr, ok := err.(*lmaxapi.OpError) log.Println("operr:", operr) if !ok { return } //1.2 if operr.Code == 403 { log.Println("stop session") session.Stop() return } //1.1 and 1.3 //stream 中发生错误,重启stream, 如果是交易,可能要选择重启session if operr.Op == "Stream" { log.Println("stop stream") if operr.Code == 0 { time.Sleep(1 * time.Second) } failed++ if (failed == 3) { session.Stop() return } session.StopStream() return } }) session.RegisterOrderBookEvent(func(s *lmaxapi.Session, event *response.OrderBookEvent) { failed = 0 select { case ch <- event: default: } }) log.Println("begin subscribe") session.LoadAllInstruments(func(value *response.Instrument) { if (value.Id > 4020) { return } setInst(value.Id, value) session.Subscribe(request.NewOrderBookSubscriptionRequest(value.Id), common.DefaultSubscribeCB) }) session.Wait() log.Println("end subscribe") //检查stream 是否 alive, 5s 请求heartbeat 5s内没有任何响应,就是stream已经损坏 session.HeartbeatTimeout(5 * time.Second) session.Start() } func httpserver() { s := &http.Server{ Addr: ":6060", ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, MaxHeaderBytes: 1 << 20, } http.Handle("/tickdata", MakeGzipHandler(http.HandlerFunc(tickdata))) log.Fatal(s.ListenAndServe()) } func tickdata(w http.ResponseWriter, r *http.Request) { //ty := r.FormValue("t") cb := r.FormValue("callback") price := getPrice() str := "if (" + cb + ") " + cb + "(" js, _ := json.Marshal(price) data := append([]byte(str), js...) data = append(data, []byte(")")...) w.Write(data) } func MakeGzipHandler(fn http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { fn(w, r) return } w.Header().Set("Content-Encoding", "gzip") w.Header().Set("Content-Type", "text/javascript") gz := gzip.NewWriter(w) defer gz.Close() fn(gzipResponseWriter{Writer: gz, ResponseWriter: w}, r) } } type gzipResponseWriter struct { io.Writer http.ResponseWriter } func (w gzipResponseWriter) Write(b []byte) (int, error) { return w.Writer.Write(b) }