server.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. package tick
  2. import "errors"
  3. import "net/http"
  4. import "github.com/niniwzw/http2"
  5. import "os"
  6. import "log"
  7. import "tickserver/framework/msq"
  8. import _ "github.com/go-sql-driver/mysql"
  9. import "os/signal"
  10. import "encoding/json"
  11. import "strings"
  12. import "compress/gzip"
  13. import "io"
  14. import "io/ioutil"
  15. import "net"
  16. import "crypto/tls"
  17. import "time"
  18. import "tickserver/framework/base"
  19. import "tickserver/markinfo"
  20. var _ = json.Marshal
  21. func readcert(file string) ([]byte, error) {
  22. return ioutil.ReadFile(file)
  23. }
  24. func serveProdTLS(port string) error {
  25. certPem, err := readcert("macoin.org.crt")
  26. if err != nil {
  27. return err
  28. }
  29. keyPem, err := readcert("macoin.org.key")
  30. if err != nil {
  31. return err
  32. }
  33. cert, err := tls.X509KeyPair(certPem, keyPem)
  34. if err != nil {
  35. return err
  36. }
  37. srv := &http.Server{
  38. TLSConfig: &tls.Config{
  39. Certificates: []tls.Certificate{cert},
  40. },
  41. }
  42. http2.ConfigureServer(srv, &http2.Server{})
  43. ln, err := net.Listen("tcp", port)
  44. if err != nil {
  45. return err
  46. }
  47. //log.Println("listen tcp", port)
  48. return srv.Serve(tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, srv.TLSConfig))
  49. }
  50. type tcpKeepAliveListener struct {
  51. *net.TCPListener
  52. }
  53. func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
  54. tc, err := ln.AcceptTCP()
  55. if err != nil {
  56. return
  57. }
  58. tc.SetKeepAlive(true)
  59. tc.SetKeepAlivePeriod(3 * time.Minute)
  60. return tc, nil
  61. }
  62. func signalHandle(lockname string) {
  63. c := make(chan os.Signal, 1)
  64. signal.Notify(c, os.Interrupt, os.Kill)
  65. // Block until a signal is received.
  66. s := <-c
  67. log.Println("Got signal:", s)
  68. os.Exit(0)
  69. }
  70. type gzipResponseWriter struct {
  71. io.Writer
  72. http.ResponseWriter
  73. }
  74. func (w gzipResponseWriter) Write(b []byte) (int, error) {
  75. return w.Writer.Write(b)
  76. }
  77. func (w gzipResponseWriter) Flush() {
  78. w.Writer.(*gzip.Writer).Flush()
  79. w.ResponseWriter.(http.Flusher).Flush()
  80. }
  81. func makeGzipHandler(fn http.HandlerFunc) http.HandlerFunc {
  82. return func(w http.ResponseWriter, r *http.Request) {
  83. if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
  84. fn(w, r)
  85. return
  86. }
  87. w.Header().Set("Content-Encoding", "gzip")
  88. w.Header().Set("Content-Type", "text/javascript")
  89. gz := gzip.NewWriter(w)
  90. defer gz.Close()
  91. fn(gzipResponseWriter{Writer: gz, ResponseWriter: w}, r)
  92. }
  93. }
  94. func RunServer(addr string, logfile string, isssl bool) error {
  95. go signalHandle("runserver")
  96. //go http.ListenAndServe(":8080", http.FileServer(http.Dir(serverconf.DataDir)))
  97. logw, e := os.Create(logfile)
  98. if e != nil {
  99. log.Fatal(e)
  100. }
  101. defer logw.Close()
  102. log.SetOutput(logw)
  103. //订单
  104. http.HandleFunc("/ticks", makeGzipHandler(ticks))
  105. http.HandleFunc("/download", makeGzipHandler(download))
  106. http.HandleFunc("/history", makeGzipHandler(history))
  107. http.HandleFunc("/stream", makeGzipHandler(stream))
  108. http.HandleFunc("/instruments", makeGzipHandler(instruments))
  109. http.HandleFunc("/instrument", makeGzipHandler(instrument))
  110. if isssl {
  111. err := serveProdTLS(addr)
  112. if err != nil {
  113. panic(err)
  114. }
  115. } else {
  116. s := &http.Server{
  117. Addr: addr,
  118. MaxHeaderBytes: 1 << 20,
  119. }
  120. s.ListenAndServe()
  121. }
  122. return nil
  123. }
  124. type TypeRequest struct {
  125. Type string `json:"type"`
  126. }
  127. type InstrumentsRequest struct {
  128. Type string `json:"type"`
  129. }
  130. type InstrumentRequest struct {
  131. Type string `json:"type"`
  132. Id int64 `json:"id"`
  133. }
  134. type StreamRequest struct {
  135. Type string `json:"type"`
  136. LastCount int `json:"lastcount"` //chan中数据太多的情况下,要抛弃部分数据,这个是要保留的数据,如果是0 的话,表示保留所有的数据
  137. }
  138. //分页下载
  139. type DownloadRequest struct {
  140. Type string `json:"type"`
  141. Start int64 `json:"start"`
  142. End int64 `json:"end"`
  143. Offset int `json:"offset"`
  144. Count int `json:"count"`
  145. OrderBy string `json:"orderby"`
  146. }
  147. const (
  148. Lmax = "lmax"
  149. Oanda = "oanda"
  150. EasyForex = "easyforex"
  151. Ctp = "ctp"
  152. Fix = "fix"
  153. Dzh = "dzh"
  154. Saxo = "saxo"
  155. Sina = "sina"
  156. SinaFuture = "sinafuture"
  157. Tdx = "tdx"
  158. Btc = "btc"
  159. CFix = "cfix"
  160. Polo = "polo"
  161. Bty = "bty"
  162. Huobi = "huobi"
  163. Yunbi = "yunbi"
  164. Chbtc = "chbtc"
  165. )
  166. const (
  167. IntLmax = iota + 1
  168. IntOanda
  169. IntEasyForex
  170. IntCtp
  171. IntFix
  172. IntDzh
  173. IntSaxo
  174. IntSina
  175. IntSinaFuture
  176. IntTdx
  177. IntBtc
  178. IntCFix
  179. IntPolo
  180. IntBty
  181. IntHuobi
  182. IntYunbi
  183. IntChbtc
  184. IntCount
  185. )
  186. func DataTypeName(i int) string {
  187. if i == IntLmax {
  188. return Lmax
  189. }
  190. if i == IntOanda {
  191. return Oanda
  192. }
  193. if i == IntEasyForex {
  194. return EasyForex
  195. }
  196. if i == IntCtp {
  197. return Ctp
  198. }
  199. if i == IntFix {
  200. return Fix
  201. }
  202. if i == IntDzh {
  203. return Dzh
  204. }
  205. if i == IntSaxo {
  206. return Saxo
  207. }
  208. if i == IntSina {
  209. return Sina
  210. }
  211. if i == IntSinaFuture {
  212. return SinaFuture
  213. }
  214. if i == IntTdx {
  215. return Tdx
  216. }
  217. if i == IntBtc {
  218. return Btc
  219. }
  220. if i == IntCFix {
  221. return CFix
  222. }
  223. if i == IntPolo {
  224. return Polo
  225. }
  226. if i == IntBty {
  227. return Bty
  228. }
  229. if i == IntHuobi {
  230. return Huobi
  231. }
  232. if i == IntYunbi {
  233. return Yunbi
  234. }
  235. if i == IntChbtc {
  236. return Chbtc
  237. }
  238. panic("error data type")
  239. }
  240. func TypeId(i string) int {
  241. if i == Lmax {
  242. return IntLmax
  243. }
  244. if i == Oanda {
  245. return IntOanda
  246. }
  247. if i == EasyForex {
  248. return IntEasyForex
  249. }
  250. if i == Ctp {
  251. return IntCtp
  252. }
  253. if i == Fix {
  254. return IntFix
  255. }
  256. if i == Dzh {
  257. return IntDzh
  258. }
  259. if i == Saxo {
  260. return IntSaxo
  261. }
  262. if i == Sina {
  263. return IntSina
  264. }
  265. if i == SinaFuture {
  266. return IntSinaFuture
  267. }
  268. if i == Tdx {
  269. return IntTdx
  270. }
  271. if i == Btc {
  272. return IntBtc
  273. }
  274. if i == CFix {
  275. return IntCFix
  276. }
  277. if i == Polo {
  278. return IntPolo
  279. }
  280. if i == Bty {
  281. return IntBty
  282. }
  283. if i == Huobi {
  284. return IntHuobi
  285. }
  286. if i == Yunbi {
  287. return IntYunbi
  288. }
  289. if i == Chbtc {
  290. return IntChbtc
  291. }
  292. panic("error data type")
  293. }
  294. // 期货交易所
  295. const (
  296. SHFE = "SHFE" // 上海期货交易所
  297. CFFEX = "CFFEX" // 中国金融交易所
  298. DEC = "DEC" // 大连商品交易所
  299. CZCE = "CZCE" // 郑州商品交易所
  300. )
  301. // 证券交易所
  302. const (
  303. SHEX = "SH" // 上海证券交易所
  304. SZEX = "SZ" // 深证证券交易所
  305. )
  306. const (
  307. Custom = "custom"
  308. Forex = "forex"
  309. Futures = "futures"
  310. Securities = "securities"
  311. Btcs = "btcs"
  312. )
  313. type Instrument struct {
  314. Id int64 `json:"id"` // ID = 前缀+原始ID
  315. Name string `json:"name"` // 名称
  316. Type string `json:"type"` // 用来区分种类
  317. ExId string `json:"exid"` // 交易所ID
  318. PriceInc float64 `json:"priceInc"` // 最小加价
  319. Margin float64 `json:"margin"` // 保证金
  320. StartTime int64 `json:"st"` // 上市时间
  321. EndTime int64 `json:"et"` // 下市时间
  322. }
  323. type Tick struct {
  324. Time int64 `json:"time"`
  325. InstrumentId int64 `json:"instrument_id"`
  326. Bid float64 `json:"bid"`
  327. Ask float64 `json:"ask"`
  328. Bidv float64 `json:"bidv"`
  329. Askv float64 `json:"askv"`
  330. Price float64 `json:"price"` // 最新价
  331. Volume float64 `json:"volume"` // 本次成交量(增量)
  332. }
  333. type PP [2]float64 // [0]为价格, [1]为数量
  334. // 市场深度
  335. type Depth struct {
  336. Bids []PP `json:"bids"` // 申买
  337. Asks []PP `json:"asks"` // 申卖
  338. }
  339. // 实时行情数据
  340. type Market struct {
  341. InsId int64 `json:"insId"` // 产品ID
  342. Timestamp int64 `json:"ts"` // 时间
  343. Close float64 `json:"close"` // 昨日收盘价
  344. Open float64 `json:"open"` // 今日开盘价
  345. High float64 `json:"high"` // 当日最高价
  346. Low float64 `json:"low"` // 当日最低价
  347. AllVolume float64 `json:"allVolume"` // 当日成交量
  348. AllAmount float64 `json:"allAmount"` // 成交额
  349. LastPrice float64 `json:"last"` // 最新价
  350. LastVolume float64 `json:"volume"` // 本次成交量(增量)
  351. Bids []PP `json:"bids"` // 申买
  352. Asks []PP `json:"asks"` // 申卖
  353. Type int32 `json:"type"`
  354. }
  355. func (mk *Market) ToTickGo() (*base.TickGo, error) {
  356. var tg base.TickGo
  357. tg.Time = int32(mk.Timestamp / 1000)
  358. tg.Ms = int16(mk.Timestamp % 1000)
  359. tg.Symbol = int16(mk.GetSymbolId())
  360. if tg.Symbol == int16(-1) {
  361. return nil, errors.New("symbol error.")
  362. }
  363. if len(mk.Asks) != 0 {
  364. tg.Ask = float32(mk.Asks[0][0]) //m.LastPrice
  365. tg.Askv = float32(mk.Asks[0][1])
  366. }
  367. if len(mk.Bids) != 0 {
  368. tg.Bid = float32(mk.Bids[0][0]) //m.Volume
  369. tg.Bidv = float32(mk.Bids[0][1])
  370. }
  371. return &tg, nil
  372. }
  373. func (mk *Market) GetSymbolId() int {
  374. tmpIns := Instrument{}
  375. tmpIns.Id = mk.InsId
  376. ins := tkIns2mkIns(DataTypeName(int(mk.Type)), tmpIns)
  377. symbolName := ins.Id
  378. data := strings.SplitN(symbolName, "_", 2)
  379. if len(data) != 2 {
  380. return -1
  381. }
  382. id, err := markinfo.SymbolId(strings.ToUpper(data[1]))
  383. if err != nil {
  384. return -1
  385. }
  386. return id
  387. }
  388. type Market2 struct {
  389. InsId int64 `json:"insId"` // 产品ID
  390. Timestamp int64 `json:"ts"` // 时间
  391. Close float64 `json:"close"` // 昨日收盘价
  392. Open float64 `json:"open"` // 今日开盘价
  393. High float64 `json:"high"` // 当日最高价
  394. Low float64 `json:"low"` // 当日最低价
  395. AllVolume float64 `json:"allVolume"` // 当日成交量
  396. AllAmount float64 `json:"allAmount"` // 成交额
  397. LastPrice float64 `json:"last"` // 最新价
  398. LastVolume float64 `json:"volume"` // 本次成交量(增量)
  399. Bids [10]PP `json:"bids"` // 申买
  400. Asks [10]PP `json:"asks"` // 申卖
  401. Type int64 `json:"type"`
  402. }
  403. type TickIndex struct {
  404. Begtime int64 `json:"begtime"`
  405. Endtime int64 `json:"endtime"`
  406. Path string `json:"path"`
  407. Ty string `json:"ty"`
  408. Tickcount int64 `json:"tickcount"`
  409. Totalcount int64 `json:"totalcount"`
  410. }
  411. func ticks(w http.ResponseWriter, r *http.Request) {
  412. req := TypeRequest{}
  413. err := decodeRequest(w, r, &req)
  414. //log.Println("ticks", req)
  415. if err != nil {
  416. sendErrResponse(w, r, http.StatusOK, err.Error())
  417. return
  418. }
  419. client := tserver.GetClient()
  420. msg := client.SendMessage(msq.MsgTKGetTicks, &req)
  421. err = msg.GetErr()
  422. if err != nil {
  423. sendErrResponse(w, r, http.StatusOK, err.Error())
  424. return
  425. }
  426. sendResponse(w, r, msg.GetData())
  427. }
  428. func instruments(w http.ResponseWriter, r *http.Request) {
  429. req := InstrumentsRequest{}
  430. err := decodeRequest(w, r, &req)
  431. //log.Println("instruments", req)
  432. if err != nil {
  433. sendErrResponse(w, r, http.StatusOK, err.Error())
  434. return
  435. }
  436. client := tserver.GetClient()
  437. msg := client.SendMessage(msq.MsgInss, &req)
  438. err = msg.GetErr()
  439. if err != nil {
  440. sendErrResponse(w, r, http.StatusOK, err.Error())
  441. return
  442. }
  443. sendResponse(w, r, msg.GetData())
  444. }
  445. func instrument(w http.ResponseWriter, r *http.Request) {
  446. req := InstrumentRequest{}
  447. err := decodeRequest(w, r, &req)
  448. //log.Println("instrument", req)
  449. if err != nil {
  450. sendErrResponse(w, r, http.StatusOK, err.Error())
  451. return
  452. }
  453. client := tserver.GetClient()
  454. msg := client.SendMessage(msq.MsgIns, &req)
  455. err = msg.GetErr()
  456. if err != nil {
  457. sendErrResponse(w, r, http.StatusOK, err.Error())
  458. return
  459. }
  460. sendResponse(w, r, msg.GetData())
  461. }
  462. func download(w http.ResponseWriter, r *http.Request) {
  463. //read post data
  464. req := DownloadRequest{}
  465. err := decodeRequest(w, r, &req)
  466. //log.Println("download", req)
  467. if err != nil {
  468. sendErrResponse(w, r, http.StatusOK, err.Error())
  469. return
  470. }
  471. client := tserver.GetClient()
  472. msg1 := client.NewMessage(msq.MsgTKDown, &req) //加载账户到内存
  473. msg := client.SendMessages(msg1)
  474. err = msg.GetErr()
  475. if err != nil {
  476. sendErrResponse(w, r, http.StatusOK, err.Error())
  477. return
  478. }
  479. sendResponse(w, r, msg1.GetData())
  480. }
  481. func history(w http.ResponseWriter, r *http.Request) {
  482. //read post data
  483. req := DownloadRequest{}
  484. err := decodeRequest(w, r, &req)
  485. //log.Println("history", req)
  486. if err != nil {
  487. sendErrResponse(w, r, http.StatusOK, err.Error())
  488. return
  489. }
  490. client := tserver.GetClient()
  491. msg1 := client.NewMessage(msq.MsgTKHis, &req) //加载账户到内存
  492. msg := client.SendMessages(msg1)
  493. err = msg.GetErr()
  494. if err != nil {
  495. sendErrResponse(w, r, http.StatusOK, err.Error())
  496. return
  497. }
  498. sendResponse(w, r, msg1.GetData())
  499. }
  500. func readChan(ch chan *Market, buf []*Market) (n int) {
  501. var i = 1
  502. buf[0] = <-ch
  503. for {
  504. if i == len(buf) {
  505. return i
  506. }
  507. select {
  508. case data := <-ch:
  509. buf[i] = data
  510. i++
  511. default:
  512. return i
  513. }
  514. }
  515. panic("nerver reach")
  516. }
  517. func stream(w http.ResponseWriter, r *http.Request) {
  518. req := StreamRequest{}
  519. err := decodeRequest(w, r, &req)
  520. //log.Println("stream", req)
  521. if err != nil {
  522. sendErrResponse(w, r, http.StatusOK, err.Error())
  523. return
  524. }
  525. client := tserver.GetClient()
  526. msg := client.SendMessage(msq.MsgTKSub, &req)
  527. err = msg.GetErr()
  528. if err != nil {
  529. sendErrResponse(w, r, http.StatusOK, err.Error())
  530. return
  531. }
  532. realdata := msg.GetData().(*Subscribe)
  533. defer realdata.Close()
  534. buffer := make([]*Market, 1024)
  535. encoder := json.NewEncoder(w)
  536. for {
  537. n := readChan(realdata.chmk, buffer)
  538. //if req.Type == Tdx {
  539. //log.Println("[stream.readChan]data trace", n)
  540. //}
  541. start := 0
  542. if req.LastCount > 0 {
  543. start = n - req.LastCount
  544. }
  545. if start < 0 {
  546. start = 0
  547. }
  548. for i := start; i < n; i++ {
  549. //if buffer[i].Type == IntTdx && buffer[i].InsId == 1 {
  550. //log.Println("[stream write begin]data trace")
  551. //}
  552. //log.Println("stream fuck", start, n)
  553. err := encoder.Encode(buffer[i])
  554. //if buffer[i].Type == IntTdx && buffer[i].InsId == 1 {
  555. //log.Println("[stream write end]data trace")
  556. //}
  557. if err != nil {
  558. //log.Println(err)
  559. return
  560. }
  561. //if buffer[i].Type == IntTdx && buffer[i].InsId == 1 {
  562. //log.Println("[stream send]data trace")
  563. //}
  564. }
  565. //if req.Type == Tdx {
  566. //log.Println("[stream flush begin]data trace", n)
  567. //}
  568. w.(http.Flusher).Flush()
  569. //if req.Type == Tdx {
  570. //log.Println("[stream flush end]data trace", n)
  571. //}
  572. }
  573. }