使用Java 11 WebSocket API的Websocket客戶端

買賣虛擬貨幣

對於一個小型加密專案,我想使用Bitfinex WebSocket API實時獲取市場資料。

從Java SE 11開始,JDK包含一個客戶端WebSocket API。Javadoc包含一些程式碼示例,但是您不能立即使用這些示例。在網上搜尋“ java websocket client”將主要顯示有關舊JSR 352 websocket的示例和指南。設定一切以完成工作並不像預期的那樣簡單,因此我編寫了這個小教程。

只要適合,我都會在專案中使用vert.x,並且也有一個websocket客戶端API。但是可悲的是vert.x websocket客戶端有一些缺點,它不適用於重定向儘管我們不使用vert.x websocket客戶端,但我們將vert.x用作小型應用程式的基礎。

夠多了,我們開始編碼。要構建並連線到Websocket伺服器,構建器需要一個偵聽器,該偵聽器將偵聽傳入的資料包:

class BitfinexListener(val vertx: Vertx) : WebSocket.Listener {

    override fun onOpen(webSocket: WebSocket?) {
        super.onOpen(webSocket)
        LOGGER.info("websocket opened")
        this.vertx.periodicStream(60000).toObservable()
          .subscribe { i ->
            val pingTxt = JsonObject().put("event", "ping")
                           .put("cid", Random(2020).nextInt())
                           .encode()
            webSocket?.sendText(pingTxt, true)?
             .thenRun {-> LOGGER.info("sent ping {}", pingTxt)}
          }
    }

    var parts: MutableList<CharSequence?> = 
            MutableList(0) { index: Int -> "" }
    var accumulatedMessage: CompletableFuture<*> =
            CompletableFuture<Any>()

    override fun onText(webSocket: WebSocket,
                         message: CharSequence?,
                         last: Boolean): CompletionStage<*>? {
        parts.add(message)
        webSocket.request(1)
        if (last) {
            val completeMessage = parts.joinToString(separator = "") 
                   { charSequence -> charSequence ?: "" }
            parts.clear()
            accumulatedMessage.complete(null)
            val cf: CompletionStage<*> = accumulatedMessage
            accumulatedMessage = CompletableFuture<Any>()
            onMessage(completeMessage)
            return cf
        }
        return accumulatedMessage
    }

    fun onMessage(message: String) {
        val bitfinexMessage = Json.decodeValue(message)
        // ... see repo at github for full code
    }
}

我們重寫onOpen方法以建立對bitfinex的定期ping。重要的一件事是,當您覆蓋onOpen時,必須呼叫super.onOpen,否則客戶端不會向伺服器傳送任何資料。

我花了很多時間才發現這個錯誤。由於偵聽器是一個介面,所以我不習慣於呼叫介面的超級方法。但在這種情況下這很重要。java8中引入的介面中預設方法的概念對我來說還沒有成為第二天性。

onText訊息將收集所有傳輸的文字資料,直到文字完成為止(通常是一次呼叫inText的情況)。文字完成後,將透過onMessage方法中的vert.x事件匯流排傳送文字。

現在,我們可以使用此偵聽器設定一個表示與bitfinex的連線的頂點:

class BitfinexConnection : AbstractVerticle() {

    var webSocket: WebSocket? = null

    override fun start() {
        LOGGER.info("deploying BitfinexConnection")
        val subs = vertx.sharedData()
                .getLocalMap<Int,String>("bitfinex.subscriptions")
        val listener = BitfinexListener(this.vertx, subs)
        val client = HttpClient.newHttpClient()
        val uri = URI.create("wss://api-pub.bitfinex.com/ws/2")
        this.webSocket = client.newWebSocketBuilder()
                     .buildAsync(uri), listener).join()
        vertx.eventBus()
             .consumer<JsonObject(BITFINEX_EB_ADDRESS)
             .handler { jsonMsg ->

          if ( webSocket==null || webSocket?.isOutputClosed()!!) {
              jsonMsg.reply(JsonObject()
                            .put("message", "websocket closed")
                            .put("statusCode",503))
                admin@chaindaily
          }
          val bitfinexMessage = jsonMsg.body().encode()
          this.webSocket?.sendText(bitfinexMessage,true)?
           .thenRun{ LOGGER.debug("delivered {} ", bitfinexMessage)}
        }
    }
}

該類非常簡單明瞭,在verticle的start方法中,將初始化並啟動與公共bitfinex api的websocket連線。應透過vert.x事件匯流排將訊息直接傳送到websocket,因此我們在地址BITFINEX_EB_ADDRESS上啟動使用者。

在最後一步,我們將所有內容放在一起,並嘗試訂閱tBTCUSD程式碼:

fun main() {
  val vertx = Vertx.vertx()
  val symbol = "tBTCUSD"
  vertx.rxDeployVerticle(BitfinexConnection::class.java.name)
    .subscribe(
        { id ->
            LOGGER.info("deployed bitfinex connection {}", id)
            val address = "ticker." + symbol
            vertx.eventBus()
               .consumer<JsonArray>(address)
               .handler { jsonMsg ->
                  LOGGER.info("received {} {}", address,
                        jsonMsg.body().encodePrettily())
               }
            val subscribeMessage = JsonObject()
                  .bfxSubscribeTickerMessage(symbol)           
            vertx.eventBus()
                .send(BITFINEX_EB_ADDRESS, subscribeMessage)

       },
       { t: Throwable? -> 
         LOGGER.error("deployment failed", t) }
   )
}

main方法啟動Vertx並部署bitfinex Websocket Verticle。部署完成後,我們將帶有訂閱有效負載的訊息傳送到websocket verticle,並在特殊的eventbus地址上啟動使用者。

您可以在github上找到所有程式碼,對其進行克隆,以確保已安裝Java 11或更高版本,並使用以下命令執行它:

./gradlew run

我希望本教程將為您快速啟動專案中的websocket客戶端程式碼。

免責聲明:

  1. 本文版權歸原作者所有,僅代表作者本人觀點,不代表鏈報觀點或立場。
  2. 如發現文章、圖片等侵權行爲,侵權責任將由作者本人承擔。
  3. 鏈報僅提供相關項目信息,不構成任何投資建議

推荐阅读

;