123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614 |
- package tick
- import "errors"
- import "net/http"
- import "github.com/niniwzw/http2"
- import "os"
- import "log"
- import "tickserver/framework/msq"
- import _ "github.com/go-sql-driver/mysql"
- import "os/signal"
- import "encoding/json"
- import "strings"
- import "compress/gzip"
- import "io"
- import "io/ioutil"
- import "net"
- import "crypto/tls"
- import "time"
- import "tickserver/framework/base"
- import "tickserver/markinfo"
- var _ = json.Marshal
- func readcert(file string) ([]byte, error) {
- return ioutil.ReadFile(file)
- }
- func serveProdTLS(port string) error {
- certPem, err := readcert("macoin.org.crt")
- if err != nil {
- return err
- }
- keyPem, err := readcert("macoin.org.key")
- if err != nil {
- return err
- }
- cert, err := tls.X509KeyPair(certPem, keyPem)
- if err != nil {
- return err
- }
- srv := &http.Server{
- TLSConfig: &tls.Config{
- Certificates: []tls.Certificate{cert},
- },
- }
- http2.ConfigureServer(srv, &http2.Server{})
- ln, err := net.Listen("tcp", port)
- if err != nil {
- return err
- }
- //log.Println("listen tcp", port)
- return srv.Serve(tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, srv.TLSConfig))
- }
- type tcpKeepAliveListener struct {
- *net.TCPListener
- }
- func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
- tc, err := ln.AcceptTCP()
- if err != nil {
- return
- }
- tc.SetKeepAlive(true)
- tc.SetKeepAlivePeriod(3 * time.Minute)
- return tc, nil
- }
- func signalHandle(lockname string) {
- c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, os.Kill)
- // Block until a signal is received.
- s := <-c
- log.Println("Got signal:", s)
- os.Exit(0)
- }
- type gzipResponseWriter struct {
- io.Writer
- http.ResponseWriter
- }
- func (w gzipResponseWriter) Write(b []byte) (int, error) {
- return w.Writer.Write(b)
- }
- func (w gzipResponseWriter) Flush() {
- w.Writer.(*gzip.Writer).Flush()
- w.ResponseWriter.(http.Flusher).Flush()
- }
- func makeGzipHandler(fn http.HandlerFunc) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
- fn(w, r)
- return
- }
- w.Header().Set("Content-Encoding", "gzip")
- w.Header().Set("Content-Type", "text/javascript")
- gz := gzip.NewWriter(w)
- defer gz.Close()
- fn(gzipResponseWriter{Writer: gz, ResponseWriter: w}, r)
- }
- }
- func RunServer(addr string, logfile string, isssl bool) error {
- go signalHandle("runserver")
- //go http.ListenAndServe(":8080", http.FileServer(http.Dir(serverconf.DataDir)))
- logw, e := os.Create(logfile)
- if e != nil {
- log.Fatal(e)
- }
- defer logw.Close()
- log.SetOutput(logw)
- //订单
- http.HandleFunc("/ticks", makeGzipHandler(ticks))
- http.HandleFunc("/download", makeGzipHandler(download))
- http.HandleFunc("/history", makeGzipHandler(history))
- http.HandleFunc("/stream", makeGzipHandler(stream))
- http.HandleFunc("/instruments", makeGzipHandler(instruments))
- http.HandleFunc("/instrument", makeGzipHandler(instrument))
- if isssl {
- err := serveProdTLS(addr)
- if err != nil {
- panic(err)
- }
- } else {
- s := &http.Server{
- Addr: addr,
- MaxHeaderBytes: 1 << 20,
- }
- s.ListenAndServe()
- }
- return nil
- }
- type TypeRequest struct {
- Type string `json:"type"`
- }
- type InstrumentsRequest struct {
- Type string `json:"type"`
- }
- type InstrumentRequest struct {
- Type string `json:"type"`
- Id int64 `json:"id"`
- }
- type StreamRequest struct {
- Type string `json:"type"`
- LastCount int `json:"lastcount"` //chan中数据太多的情况下,要抛弃部分数据,这个是要保留的数据,如果是0 的话,表示保留所有的数据
- }
- //分页下载
- type DownloadRequest struct {
- Type string `json:"type"`
- Start int64 `json:"start"`
- End int64 `json:"end"`
- Offset int `json:"offset"`
- Count int `json:"count"`
- OrderBy string `json:"orderby"`
- }
- const (
- Lmax = "lmax"
- Oanda = "oanda"
- EasyForex = "easyforex"
- Ctp = "ctp"
- Fix = "fix"
- Dzh = "dzh"
- Saxo = "saxo"
- Sina = "sina"
- SinaFuture = "sinafuture"
- Tdx = "tdx"
- Btc = "btc"
- CFix = "cfix"
- Polo = "polo"
- Bty = "bty"
- Huobi = "huobi"
- Yunbi = "yunbi"
- Chbtc = "chbtc"
- )
- const (
- IntLmax = iota + 1
- IntOanda
- IntEasyForex
- IntCtp
- IntFix
- IntDzh
- IntSaxo
- IntSina
- IntSinaFuture
- IntTdx
- IntBtc
- IntCFix
- IntPolo
- IntBty
- IntHuobi
- IntYunbi
- IntChbtc
- IntCount
- )
- func DataTypeName(i int) string {
- if i == IntLmax {
- return Lmax
- }
- if i == IntOanda {
- return Oanda
- }
- if i == IntEasyForex {
- return EasyForex
- }
- if i == IntCtp {
- return Ctp
- }
- if i == IntFix {
- return Fix
- }
- if i == IntDzh {
- return Dzh
- }
- if i == IntSaxo {
- return Saxo
- }
- if i == IntSina {
- return Sina
- }
- if i == IntSinaFuture {
- return SinaFuture
- }
- if i == IntTdx {
- return Tdx
- }
- if i == IntBtc {
- return Btc
- }
- if i == IntCFix {
- return CFix
- }
- if i == IntPolo {
- return Polo
- }
- if i == IntBty {
- return Bty
- }
- if i == IntHuobi {
- return Huobi
- }
- if i == IntYunbi {
- return Yunbi
- }
- if i == IntChbtc {
- return Chbtc
- }
- panic("error data type")
- }
- func TypeId(i string) int {
- if i == Lmax {
- return IntLmax
- }
- if i == Oanda {
- return IntOanda
- }
- if i == EasyForex {
- return IntEasyForex
- }
- if i == Ctp {
- return IntCtp
- }
- if i == Fix {
- return IntFix
- }
- if i == Dzh {
- return IntDzh
- }
- if i == Saxo {
- return IntSaxo
- }
- if i == Sina {
- return IntSina
- }
- if i == SinaFuture {
- return IntSinaFuture
- }
- if i == Tdx {
- return IntTdx
- }
- if i == Btc {
- return IntBtc
- }
- if i == CFix {
- return IntCFix
- }
- if i == Polo {
- return IntPolo
- }
- if i == Bty {
- return IntBty
- }
- if i == Huobi {
- return IntHuobi
- }
- if i == Yunbi {
- return IntYunbi
- }
- if i == Chbtc {
- return IntChbtc
- }
- panic("error data type")
- }
- // 期货交易所
- const (
- SHFE = "SHFE" // 上海期货交易所
- CFFEX = "CFFEX" // 中国金融交易所
- DEC = "DEC" // 大连商品交易所
- CZCE = "CZCE" // 郑州商品交易所
- )
- // 证券交易所
- const (
- SHEX = "SH" // 上海证券交易所
- SZEX = "SZ" // 深证证券交易所
- )
- const (
- Custom = "custom"
- Forex = "forex"
- Futures = "futures"
- Securities = "securities"
- Btcs = "btcs"
- )
- type Instrument struct {
- Id int64 `json:"id"` // ID = 前缀+原始ID
- Name string `json:"name"` // 名称
- Type string `json:"type"` // 用来区分种类
- ExId string `json:"exid"` // 交易所ID
- PriceInc float64 `json:"priceInc"` // 最小加价
- Margin float64 `json:"margin"` // 保证金
- StartTime int64 `json:"st"` // 上市时间
- EndTime int64 `json:"et"` // 下市时间
- }
- type Tick struct {
- Time int64 `json:"time"`
- InstrumentId int64 `json:"instrument_id"`
- Bid float64 `json:"bid"`
- Ask float64 `json:"ask"`
- Bidv float64 `json:"bidv"`
- Askv float64 `json:"askv"`
- Price float64 `json:"price"` // 最新价
- Volume float64 `json:"volume"` // 本次成交量(增量)
- }
- type PP [2]float64 // [0]为价格, [1]为数量
- // 市场深度
- type Depth struct {
- Bids []PP `json:"bids"` // 申买
- Asks []PP `json:"asks"` // 申卖
- }
- // 实时行情数据
- type Market struct {
- InsId int64 `json:"insId"` // 产品ID
- Timestamp int64 `json:"ts"` // 时间
- Close float64 `json:"close"` // 昨日收盘价
- Open float64 `json:"open"` // 今日开盘价
- High float64 `json:"high"` // 当日最高价
- Low float64 `json:"low"` // 当日最低价
- AllVolume float64 `json:"allVolume"` // 当日成交量
- AllAmount float64 `json:"allAmount"` // 成交额
- LastPrice float64 `json:"last"` // 最新价
- LastVolume float64 `json:"volume"` // 本次成交量(增量)
- Bids []PP `json:"bids"` // 申买
- Asks []PP `json:"asks"` // 申卖
- Type int32 `json:"type"`
- }
- func (mk *Market) ToTickGo() (*base.TickGo, error) {
- var tg base.TickGo
- tg.Time = int32(mk.Timestamp / 1000)
- tg.Ms = int16(mk.Timestamp % 1000)
- tg.Symbol = int16(mk.GetSymbolId())
- if tg.Symbol == int16(-1) {
- return nil, errors.New("symbol error.")
- }
- if len(mk.Asks) != 0 {
- tg.Ask = float32(mk.Asks[0][0]) //m.LastPrice
- tg.Askv = float32(mk.Asks[0][1])
- }
- if len(mk.Bids) != 0 {
- tg.Bid = float32(mk.Bids[0][0]) //m.Volume
- tg.Bidv = float32(mk.Bids[0][1])
- }
- return &tg, nil
- }
- func (mk *Market) GetSymbolId() int {
- tmpIns := Instrument{}
- tmpIns.Id = mk.InsId
- ins := tkIns2mkIns(DataTypeName(int(mk.Type)), tmpIns)
- symbolName := ins.Id
- data := strings.SplitN(symbolName, "_", 2)
- if len(data) != 2 {
- return -1
- }
- id, err := markinfo.SymbolId(strings.ToUpper(data[1]))
- if err != nil {
- return -1
- }
- return id
- }
- type Market2 struct {
- InsId int64 `json:"insId"` // 产品ID
- Timestamp int64 `json:"ts"` // 时间
- Close float64 `json:"close"` // 昨日收盘价
- Open float64 `json:"open"` // 今日开盘价
- High float64 `json:"high"` // 当日最高价
- Low float64 `json:"low"` // 当日最低价
- AllVolume float64 `json:"allVolume"` // 当日成交量
- AllAmount float64 `json:"allAmount"` // 成交额
- LastPrice float64 `json:"last"` // 最新价
- LastVolume float64 `json:"volume"` // 本次成交量(增量)
- Bids [10]PP `json:"bids"` // 申买
- Asks [10]PP `json:"asks"` // 申卖
- Type int64 `json:"type"`
- }
- type TickIndex struct {
- Begtime int64 `json:"begtime"`
- Endtime int64 `json:"endtime"`
- Path string `json:"path"`
- Ty string `json:"ty"`
- Tickcount int64 `json:"tickcount"`
- Totalcount int64 `json:"totalcount"`
- }
- func ticks(w http.ResponseWriter, r *http.Request) {
- req := TypeRequest{}
- err := decodeRequest(w, r, &req)
- //log.Println("ticks", req)
- if err != nil {
- sendErrResponse(w, r, http.StatusOK, err.Error())
- return
- }
- client := tserver.GetClient()
- msg := client.SendMessage(msq.MsgTKGetTicks, &req)
- err = msg.GetErr()
- if err != nil {
- sendErrResponse(w, r, http.StatusOK, err.Error())
- return
- }
- sendResponse(w, r, msg.GetData())
- }
- func instruments(w http.ResponseWriter, r *http.Request) {
- req := InstrumentsRequest{}
- err := decodeRequest(w, r, &req)
- //log.Println("instruments", req)
- if err != nil {
- sendErrResponse(w, r, http.StatusOK, err.Error())
- return
- }
- client := tserver.GetClient()
- msg := client.SendMessage(msq.MsgInss, &req)
- err = msg.GetErr()
- if err != nil {
- sendErrResponse(w, r, http.StatusOK, err.Error())
- return
- }
- sendResponse(w, r, msg.GetData())
- }
- func instrument(w http.ResponseWriter, r *http.Request) {
- req := InstrumentRequest{}
- err := decodeRequest(w, r, &req)
- //log.Println("instrument", req)
- if err != nil {
- sendErrResponse(w, r, http.StatusOK, err.Error())
- return
- }
- client := tserver.GetClient()
- msg := client.SendMessage(msq.MsgIns, &req)
- err = msg.GetErr()
- if err != nil {
- sendErrResponse(w, r, http.StatusOK, err.Error())
- return
- }
- sendResponse(w, r, msg.GetData())
- }
- func download(w http.ResponseWriter, r *http.Request) {
- //read post data
- req := DownloadRequest{}
- err := decodeRequest(w, r, &req)
- //log.Println("download", req)
- if err != nil {
- sendErrResponse(w, r, http.StatusOK, err.Error())
- return
- }
- client := tserver.GetClient()
- msg1 := client.NewMessage(msq.MsgTKDown, &req) //加载账户到内存
- msg := client.SendMessages(msg1)
- err = msg.GetErr()
- if err != nil {
- sendErrResponse(w, r, http.StatusOK, err.Error())
- return
- }
- sendResponse(w, r, msg1.GetData())
- }
- func history(w http.ResponseWriter, r *http.Request) {
- //read post data
- req := DownloadRequest{}
- err := decodeRequest(w, r, &req)
- //log.Println("history", req)
- if err != nil {
- sendErrResponse(w, r, http.StatusOK, err.Error())
- return
- }
- client := tserver.GetClient()
- msg1 := client.NewMessage(msq.MsgTKHis, &req) //加载账户到内存
- msg := client.SendMessages(msg1)
- err = msg.GetErr()
- if err != nil {
- sendErrResponse(w, r, http.StatusOK, err.Error())
- return
- }
- sendResponse(w, r, msg1.GetData())
- }
- func readChan(ch chan *Market, buf []*Market) (n int) {
- var i = 1
- buf[0] = <-ch
- for {
- if i == len(buf) {
- return i
- }
- select {
- case data := <-ch:
- buf[i] = data
- i++
- default:
- return i
- }
- }
- panic("nerver reach")
- }
- func stream(w http.ResponseWriter, r *http.Request) {
- req := StreamRequest{}
- err := decodeRequest(w, r, &req)
- //log.Println("stream", req)
- if err != nil {
- sendErrResponse(w, r, http.StatusOK, err.Error())
- return
- }
- client := tserver.GetClient()
- msg := client.SendMessage(msq.MsgTKSub, &req)
- err = msg.GetErr()
- if err != nil {
- sendErrResponse(w, r, http.StatusOK, err.Error())
- return
- }
- realdata := msg.GetData().(*Subscribe)
- defer realdata.Close()
- buffer := make([]*Market, 1024)
- encoder := json.NewEncoder(w)
- for {
- n := readChan(realdata.chmk, buffer)
- //if req.Type == Tdx {
- //log.Println("[stream.readChan]data trace", n)
- //}
- start := 0
- if req.LastCount > 0 {
- start = n - req.LastCount
- }
- if start < 0 {
- start = 0
- }
- for i := start; i < n; i++ {
- //if buffer[i].Type == IntTdx && buffer[i].InsId == 1 {
- //log.Println("[stream write begin]data trace")
- //}
- //log.Println("stream fuck", start, n)
- err := encoder.Encode(buffer[i])
- //if buffer[i].Type == IntTdx && buffer[i].InsId == 1 {
- //log.Println("[stream write end]data trace")
- //}
- if err != nil {
- //log.Println(err)
- return
- }
- //if buffer[i].Type == IntTdx && buffer[i].InsId == 1 {
- //log.Println("[stream send]data trace")
- //}
- }
- //if req.Type == Tdx {
- //log.Println("[stream flush begin]data trace", n)
- //}
- w.(http.Flusher).Flush()
- //if req.Type == Tdx {
- //log.Println("[stream flush end]data trace", n)
- //}
- }
- }
|