123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package main
- import (
- "compress/gzip"
- "encoding/json"
- "io"
- "tickserver/api/lmaxapi"
- "tickserver/api/lmaxapi/main/common"
- "tickserver/api/lmaxapi/request"
- "tickserver/api/lmaxapi/response"
- "log"
- "net/http"
- "strings"
- "sync"
- "time"
- )
- var instrumentMap map[int64]*response.Instrument
- var imu sync.Mutex
- var priceMap = make(map[int64]*response.OrderBookEvent)
- var pmu sync.Mutex
- func setPrice(event *response.OrderBookEvent) {
- pmu.Lock()
- defer pmu.Unlock()
- priceMap[event.InstrumentId] = event
- }
- func getPrice() (events []*response.OrderBookEvent) {
- pmu.Lock()
- defer pmu.Unlock()
- for _, item := range priceMap {
- events = append(events, item)
- }
- return
- }
- func setInst(id int64, inst *response.Instrument) {
- imu.Lock()
- defer imu.Unlock()
- if id == 0 && inst == nil {
- instrumentMap = nil
- }
- if instrumentMap == nil {
- instrumentMap = make(map[int64]*response.Instrument)
- }
- instrumentMap[id] = inst
- }
- func getInst(id int64) *response.Instrument {
- imu.Lock()
- defer imu.Unlock()
- data, ok := instrumentMap[id]
- if ok {
- return data
- }
- return nil
- }
- func main() {
- ch := make(chan *response.OrderBookEvent, 100)
- go func() {
- for {
- event := <-ch
- setPrice(event)
- log.Println("dt = ", time.Now().UnixNano() / 1e6 - event.Timestamp, time.Unix(event.Timestamp / 1000, (event.Timestamp % 1000) * 1e6), event.InstrumentId)
- }
- }()
- go httpserver()
- for {
- marketdata(ch)
- //session stop的情况下,休息一会
- time.Sleep(1 * time.Second)
- }
- }
- func marketdata(ch chan *response.OrderBookEvent) {
- log.Println("MarketDataClient2")
- logw, err := common.EnableLog("proto.log")
- if err != nil {
- log.Println("EnableLog Failed:", err)
- return
- }
- defer logw.Close()
- session, err := common.CreateSession()
- if err != nil {
- log.Println("Login Failed:", err)
- time.Sleep(10 * time.Second)
- return
- }
- failed := 0
- session.RegisterStreamFailureEvent(func(s *lmaxapi.Session, err error) {
- //1. 处理下面的事件
- //1.1 网络中断:如果是交易,可能需要重新登录,以获取最新订单状态,如果是行情,可能不需要重启
- //1.2 session 过期,发生403错误,必须重新登录
- //1.3 heartbeat 心跳无响应: Op=stream err=heart beart timeout, Code=-1
- // 这个时候调用 session.StreamClose() 重新启动stream
- operr, ok := err.(*lmaxapi.OpError)
- log.Println("operr:", operr)
- if !ok {
- return
- }
- //1.2
- if operr.Code == 403 {
- log.Println("stop session")
- session.Stop()
- return
- }
- //1.1 and 1.3
- //stream 中发生错误,重启stream, 如果是交易,可能要选择重启session
- if operr.Op == "Stream" {
- log.Println("stop stream")
- if operr.Code == 0 {
- time.Sleep(1 * time.Second)
- }
- failed++
- if (failed == 3) {
- session.Stop()
- return
- }
- session.StopStream()
- return
- }
- })
- session.RegisterOrderBookEvent(func(s *lmaxapi.Session, event *response.OrderBookEvent) {
- failed = 0
- select {
- case ch <- event:
- default:
- }
- })
- log.Println("begin subscribe")
- session.LoadAllInstruments(func(value *response.Instrument) {
- if (value.Id > 4020) {
- return
- }
- setInst(value.Id, value)
- session.Subscribe(request.NewOrderBookSubscriptionRequest(value.Id), common.DefaultSubscribeCB)
- })
- session.Wait()
- log.Println("end subscribe")
- //检查stream 是否 alive, 5s 请求heartbeat 5s内没有任何响应,就是stream已经损坏
- session.HeartbeatTimeout(5 * time.Second)
- session.Start()
- }
- func httpserver() {
- s := &http.Server{
- Addr: ":6060",
- ReadTimeout: 10 * time.Second,
- WriteTimeout: 10 * time.Second,
- MaxHeaderBytes: 1 << 20,
- }
- http.Handle("/tickdata", MakeGzipHandler(http.HandlerFunc(tickdata)))
- log.Fatal(s.ListenAndServe())
- }
- func tickdata(w http.ResponseWriter, r *http.Request) {
- //ty := r.FormValue("t")
- cb := r.FormValue("callback")
- price := getPrice()
- str := "if (" + cb + ") " + cb + "("
- js, _ := json.Marshal(price)
- data := append([]byte(str), js...)
- data = append(data, []byte(")")...)
- w.Write(data)
- }
- func MakeGzipHandler(fn http.HandlerFunc) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
- fn(w, r)
- return
- }
- w.Header().Set("Content-Encoding", "gzip")
- w.Header().Set("Content-Type", "text/javascript")
- gz := gzip.NewWriter(w)
- defer gz.Close()
- fn(gzipResponseWriter{Writer: gz, ResponseWriter: w}, r)
- }
- }
- type gzipResponseWriter struct {
- io.Writer
- http.ResponseWriter
- }
- func (w gzipResponseWriter) Write(b []byte) (int, error) {
- return w.Writer.Write(b)
- }
|