package tick import "errors" import "net/http" import "github.com/niniwzw/http2" import "os" import "log" import "tickserver/framework/msq" import _ "github.com/go-sql-driver/mysql" import "os/signal" import "encoding/json" import "strings" import "compress/gzip" import "io" import "io/ioutil" import "net" import "crypto/tls" import "time" import "tickserver/framework/base" import "tickserver/markinfo" var _ = json.Marshal func readcert(file string) ([]byte, error) { return ioutil.ReadFile(file) } func serveProdTLS(port string) error { certPem, err := readcert("macoin.org.crt") if err != nil { return err } keyPem, err := readcert("macoin.org.key") if err != nil { return err } cert, err := tls.X509KeyPair(certPem, keyPem) if err != nil { return err } srv := &http.Server{ TLSConfig: &tls.Config{ Certificates: []tls.Certificate{cert}, }, } http2.ConfigureServer(srv, &http2.Server{}) ln, err := net.Listen("tcp", port) if err != nil { return err } //log.Println("listen tcp", port) return srv.Serve(tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, srv.TLSConfig)) } type tcpKeepAliveListener struct { *net.TCPListener } func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { tc, err := ln.AcceptTCP() if err != nil { return } tc.SetKeepAlive(true) tc.SetKeepAlivePeriod(3 * time.Minute) return tc, nil } func signalHandle(lockname string) { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, os.Kill) // Block until a signal is received. s := <-c log.Println("Got signal:", s) os.Exit(0) } type gzipResponseWriter struct { io.Writer http.ResponseWriter } func (w gzipResponseWriter) Write(b []byte) (int, error) { return w.Writer.Write(b) } func (w gzipResponseWriter) Flush() { w.Writer.(*gzip.Writer).Flush() w.ResponseWriter.(http.Flusher).Flush() } func makeGzipHandler(fn http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { fn(w, r) return } w.Header().Set("Content-Encoding", "gzip") w.Header().Set("Content-Type", "text/javascript") gz := gzip.NewWriter(w) defer gz.Close() fn(gzipResponseWriter{Writer: gz, ResponseWriter: w}, r) } } func RunServer(addr string, logfile string, isssl bool) error { go signalHandle("runserver") //go http.ListenAndServe(":8080", http.FileServer(http.Dir(serverconf.DataDir))) logw, e := os.Create(logfile) if e != nil { log.Fatal(e) } defer logw.Close() log.SetOutput(logw) //订单 http.HandleFunc("/ticks", makeGzipHandler(ticks)) http.HandleFunc("/download", makeGzipHandler(download)) http.HandleFunc("/history", makeGzipHandler(history)) http.HandleFunc("/stream", makeGzipHandler(stream)) http.HandleFunc("/instruments", makeGzipHandler(instruments)) http.HandleFunc("/instrument", makeGzipHandler(instrument)) if isssl { err := serveProdTLS(addr) if err != nil { panic(err) } } else { s := &http.Server{ Addr: addr, MaxHeaderBytes: 1 << 20, } s.ListenAndServe() } return nil } type TypeRequest struct { Type string `json:"type"` } type InstrumentsRequest struct { Type string `json:"type"` } type InstrumentRequest struct { Type string `json:"type"` Id int64 `json:"id"` } type StreamRequest struct { Type string `json:"type"` LastCount int `json:"lastcount"` //chan中数据太多的情况下,要抛弃部分数据,这个是要保留的数据,如果是0 的话,表示保留所有的数据 } //分页下载 type DownloadRequest struct { Type string `json:"type"` Start int64 `json:"start"` End int64 `json:"end"` Offset int `json:"offset"` Count int `json:"count"` OrderBy string `json:"orderby"` } const ( Lmax = "lmax" Oanda = "oanda" EasyForex = "easyforex" Ctp = "ctp" Fix = "fix" Dzh = "dzh" Saxo = "saxo" Sina = "sina" SinaFuture = "sinafuture" Tdx = "tdx" Btc = "btc" CFix = "cfix" Polo = "polo" Bty = "bty" Huobi = "huobi" Yunbi = "yunbi" Chbtc = "chbtc" ) const ( IntLmax = iota + 1 IntOanda IntEasyForex IntCtp IntFix IntDzh IntSaxo IntSina IntSinaFuture IntTdx IntBtc IntCFix IntPolo IntBty IntHuobi IntYunbi IntChbtc IntCount ) func DataTypeName(i int) string { if i == IntLmax { return Lmax } if i == IntOanda { return Oanda } if i == IntEasyForex { return EasyForex } if i == IntCtp { return Ctp } if i == IntFix { return Fix } if i == IntDzh { return Dzh } if i == IntSaxo { return Saxo } if i == IntSina { return Sina } if i == IntSinaFuture { return SinaFuture } if i == IntTdx { return Tdx } if i == IntBtc { return Btc } if i == IntCFix { return CFix } if i == IntPolo { return Polo } if i == IntBty { return Bty } if i == IntHuobi { return Huobi } if i == IntYunbi { return Yunbi } if i == IntChbtc { return Chbtc } panic("error data type") } func TypeId(i string) int { if i == Lmax { return IntLmax } if i == Oanda { return IntOanda } if i == EasyForex { return IntEasyForex } if i == Ctp { return IntCtp } if i == Fix { return IntFix } if i == Dzh { return IntDzh } if i == Saxo { return IntSaxo } if i == Sina { return IntSina } if i == SinaFuture { return IntSinaFuture } if i == Tdx { return IntTdx } if i == Btc { return IntBtc } if i == CFix { return IntCFix } if i == Polo { return IntPolo } if i == Bty { return IntBty } if i == Huobi { return IntHuobi } if i == Yunbi { return IntYunbi } if i == Chbtc { return IntChbtc } panic("error data type") } // 期货交易所 const ( SHFE = "SHFE" // 上海期货交易所 CFFEX = "CFFEX" // 中国金融交易所 DEC = "DEC" // 大连商品交易所 CZCE = "CZCE" // 郑州商品交易所 ) // 证券交易所 const ( SHEX = "SH" // 上海证券交易所 SZEX = "SZ" // 深证证券交易所 ) const ( Custom = "custom" Forex = "forex" Futures = "futures" Securities = "securities" Btcs = "btcs" ) type Instrument struct { Id int64 `json:"id"` // ID = 前缀+原始ID Name string `json:"name"` // 名称 Type string `json:"type"` // 用来区分种类 ExId string `json:"exid"` // 交易所ID PriceInc float64 `json:"priceInc"` // 最小加价 Margin float64 `json:"margin"` // 保证金 StartTime int64 `json:"st"` // 上市时间 EndTime int64 `json:"et"` // 下市时间 } type Tick struct { Time int64 `json:"time"` InstrumentId int64 `json:"instrument_id"` Bid float64 `json:"bid"` Ask float64 `json:"ask"` Bidv float64 `json:"bidv"` Askv float64 `json:"askv"` Price float64 `json:"price"` // 最新价 Volume float64 `json:"volume"` // 本次成交量(增量) } type PP [2]float64 // [0]为价格, [1]为数量 // 市场深度 type Depth struct { Bids []PP `json:"bids"` // 申买 Asks []PP `json:"asks"` // 申卖 } // 实时行情数据 type Market struct { InsId int64 `json:"insId"` // 产品ID Timestamp int64 `json:"ts"` // 时间 Close float64 `json:"close"` // 昨日收盘价 Open float64 `json:"open"` // 今日开盘价 High float64 `json:"high"` // 当日最高价 Low float64 `json:"low"` // 当日最低价 AllVolume float64 `json:"allVolume"` // 当日成交量 AllAmount float64 `json:"allAmount"` // 成交额 LastPrice float64 `json:"last"` // 最新价 LastVolume float64 `json:"volume"` // 本次成交量(增量) Bids []PP `json:"bids"` // 申买 Asks []PP `json:"asks"` // 申卖 Type int32 `json:"type"` } func (mk *Market) ToTickGo() (*base.TickGo, error) { var tg base.TickGo tg.Time = int32(mk.Timestamp / 1000) tg.Ms = int16(mk.Timestamp % 1000) tg.Symbol = int16(mk.GetSymbolId()) if tg.Symbol == int16(-1) { return nil, errors.New("symbol error.") } if len(mk.Asks) != 0 { tg.Ask = float32(mk.Asks[0][0]) //m.LastPrice tg.Askv = float32(mk.Asks[0][1]) } if len(mk.Bids) != 0 { tg.Bid = float32(mk.Bids[0][0]) //m.Volume tg.Bidv = float32(mk.Bids[0][1]) } return &tg, nil } func (mk *Market) GetSymbolId() int { tmpIns := Instrument{} tmpIns.Id = mk.InsId ins := tkIns2mkIns(DataTypeName(int(mk.Type)), tmpIns) symbolName := ins.Id data := strings.SplitN(symbolName, "_", 2) if len(data) != 2 { return -1 } id, err := markinfo.SymbolId(strings.ToUpper(data[1])) if err != nil { return -1 } return id } type Market2 struct { InsId int64 `json:"insId"` // 产品ID Timestamp int64 `json:"ts"` // 时间 Close float64 `json:"close"` // 昨日收盘价 Open float64 `json:"open"` // 今日开盘价 High float64 `json:"high"` // 当日最高价 Low float64 `json:"low"` // 当日最低价 AllVolume float64 `json:"allVolume"` // 当日成交量 AllAmount float64 `json:"allAmount"` // 成交额 LastPrice float64 `json:"last"` // 最新价 LastVolume float64 `json:"volume"` // 本次成交量(增量) Bids [10]PP `json:"bids"` // 申买 Asks [10]PP `json:"asks"` // 申卖 Type int64 `json:"type"` } type TickIndex struct { Begtime int64 `json:"begtime"` Endtime int64 `json:"endtime"` Path string `json:"path"` Ty string `json:"ty"` Tickcount int64 `json:"tickcount"` Totalcount int64 `json:"totalcount"` } func ticks(w http.ResponseWriter, r *http.Request) { req := TypeRequest{} err := decodeRequest(w, r, &req) //log.Println("ticks", req) if err != nil { sendErrResponse(w, r, http.StatusOK, err.Error()) return } client := tserver.GetClient() msg := client.SendMessage(msq.MsgTKGetTicks, &req) err = msg.GetErr() if err != nil { sendErrResponse(w, r, http.StatusOK, err.Error()) return } sendResponse(w, r, msg.GetData()) } func instruments(w http.ResponseWriter, r *http.Request) { req := InstrumentsRequest{} err := decodeRequest(w, r, &req) //log.Println("instruments", req) if err != nil { sendErrResponse(w, r, http.StatusOK, err.Error()) return } client := tserver.GetClient() msg := client.SendMessage(msq.MsgInss, &req) err = msg.GetErr() if err != nil { sendErrResponse(w, r, http.StatusOK, err.Error()) return } sendResponse(w, r, msg.GetData()) } func instrument(w http.ResponseWriter, r *http.Request) { req := InstrumentRequest{} err := decodeRequest(w, r, &req) //log.Println("instrument", req) if err != nil { sendErrResponse(w, r, http.StatusOK, err.Error()) return } client := tserver.GetClient() msg := client.SendMessage(msq.MsgIns, &req) err = msg.GetErr() if err != nil { sendErrResponse(w, r, http.StatusOK, err.Error()) return } sendResponse(w, r, msg.GetData()) } func download(w http.ResponseWriter, r *http.Request) { //read post data req := DownloadRequest{} err := decodeRequest(w, r, &req) //log.Println("download", req) if err != nil { sendErrResponse(w, r, http.StatusOK, err.Error()) return } client := tserver.GetClient() msg1 := client.NewMessage(msq.MsgTKDown, &req) //加载账户到内存 msg := client.SendMessages(msg1) err = msg.GetErr() if err != nil { sendErrResponse(w, r, http.StatusOK, err.Error()) return } sendResponse(w, r, msg1.GetData()) } func history(w http.ResponseWriter, r *http.Request) { //read post data req := DownloadRequest{} err := decodeRequest(w, r, &req) //log.Println("history", req) if err != nil { sendErrResponse(w, r, http.StatusOK, err.Error()) return } client := tserver.GetClient() msg1 := client.NewMessage(msq.MsgTKHis, &req) //加载账户到内存 msg := client.SendMessages(msg1) err = msg.GetErr() if err != nil { sendErrResponse(w, r, http.StatusOK, err.Error()) return } sendResponse(w, r, msg1.GetData()) } func readChan(ch chan *Market, buf []*Market) (n int) { var i = 1 buf[0] = <-ch for { if i == len(buf) { return i } select { case data := <-ch: buf[i] = data i++ default: return i } } panic("nerver reach") } func stream(w http.ResponseWriter, r *http.Request) { req := StreamRequest{} err := decodeRequest(w, r, &req) //log.Println("stream", req) if err != nil { sendErrResponse(w, r, http.StatusOK, err.Error()) return } client := tserver.GetClient() msg := client.SendMessage(msq.MsgTKSub, &req) err = msg.GetErr() if err != nil { sendErrResponse(w, r, http.StatusOK, err.Error()) return } realdata := msg.GetData().(*Subscribe) defer realdata.Close() buffer := make([]*Market, 1024) encoder := json.NewEncoder(w) for { n := readChan(realdata.chmk, buffer) //if req.Type == Tdx { //log.Println("[stream.readChan]data trace", n) //} start := 0 if req.LastCount > 0 { start = n - req.LastCount } if start < 0 { start = 0 } for i := start; i < n; i++ { //if buffer[i].Type == IntTdx && buffer[i].InsId == 1 { //log.Println("[stream write begin]data trace") //} //log.Println("stream fuck", start, n) err := encoder.Encode(buffer[i]) //if buffer[i].Type == IntTdx && buffer[i].InsId == 1 { //log.Println("[stream write end]data trace") //} if err != nil { //log.Println(err) return } //if buffer[i].Type == IntTdx && buffer[i].InsId == 1 { //log.Println("[stream send]data trace") //} } //if req.Type == Tdx { //log.Println("[stream flush begin]data trace", n) //} w.(http.Flusher).Flush() //if req.Type == Tdx { //log.Println("[stream flush end]data trace", n) //} } }