色综合图-色综合图片-色综合图片二区150p-色综合图区-玖玖国产精品视频-玖玖香蕉视频

您的位置:首頁(yè)技術(shù)文章
文章詳情頁(yè)

用Python進(jìn)行websocket接口測(cè)試

瀏覽:4日期:2022-07-08 09:09:31

我們?cè)谧鼋涌跍y(cè)試時(shí),除了常見(jiàn)的http接口,還有一種比較多見(jiàn),就是socket接口,今天講解下怎么用Python進(jìn)行websocket接口測(cè)試。

現(xiàn)在大多數(shù)用的都是websocket,那我們就先來(lái)安裝一下websocket的安裝包。

pip install websocket-client

用Python進(jìn)行websocket接口測(cè)試

安裝完之后,我們就開(kāi)始我們的websocket之旅了。

我們先來(lái)看個(gè)炒雞簡(jiǎn)單的栗子:

import websocketws = websocket.WebSocket()ws.connect('ws://example.com/websocket', http_proxy_host='proxy_host_name', http_proxy_port=3128)

這個(gè)栗子就是創(chuàng)建一個(gè)websocket連接,這個(gè)模塊支持通過(guò)http代理訪問(wèn)websocket。代理服務(wù)器允許使用connect方法連接到websocket端口。默認(rèn)的squid設(shè)置是“只允許連接HTTPS端口”。

在websocket里,我們有常用的這幾個(gè)方法:

on_message方法:

def on_message(ws, message): print(message)

on_message是用來(lái)接受消息的,server發(fā)送的所有消息都可以用on_message這個(gè)方法來(lái)收取。

on_error方法:

def on_error(ws, error): print(error)

這個(gè)方法是用來(lái)處理錯(cuò)誤異常的,如果一旦socket的程序出現(xiàn)了通信的問(wèn)題,就可以被這個(gè)方法捕捉到。

on_open方法:

def on_open(ws): def run(*args): for i in range(30): # send the message, then wait # so thread doesn’t exit and socket # isn’t closed ws.send('Hello %d' % i) time.sleep(1) time.sleep(1) ws.close() print('Thread terminating...') Thread(target=run).start()

on_open方法是用來(lái)保持連接的,上面這樣的一個(gè)例子,就是保持連接的一個(gè)過(guò)程,每隔一段時(shí)間就會(huì)來(lái)做一件事,他會(huì)在30s內(nèi)一直發(fā)送hello。最后停止。

on_close方法:

def on_close(ws): print('### closed ###')

onclose主要就是關(guān)閉socket連接的。

如何創(chuàng)建一個(gè)websocket應(yīng)用:

ws = websocket.WebSocketApp('wss://echo.websocket.org')

括號(hào)里面就是你要連接的socket的地址,在WebSocketApp這個(gè)實(shí)例化的方法里面還可以有其他參數(shù),這些參數(shù)就是我們剛剛介紹的這些方法。

ws = websocket.WebSocketApp('ws://echo.websocket.org/', on_message=on_message, on_error=on_error, on_close=on_close)

指定了這些參數(shù)之后就可以直接進(jìn)行調(diào)用了,例如:

ws.on_open = on_open

這樣就是調(diào)用了on_open方法

如果我們想讓我們的socket保持長(zhǎng)連接,一直連接著,就可以使用run_forever方法:

ws.run_forever()

完整代碼:

import websocketfrom threading import Threadimport timeimport sysdef on_message(ws, message): print(message)def on_error(ws, error): print(error)def on_close(ws): print('### closed ###')def on_open(ws): def run(*args): for i in range(3): # send the message, then wait # so thread doesn’t exit and socket # isn’t closed ws.send('Hello %d' % i) time.sleep(1) time.sleep(1) ws.close() print('Thread terminating...') Thread(target=run).start()if __name__ == '__main__': websocket.enableTrace(True) host = 'ws://echo.websocket.org/' ws = websocket.WebSocketApp(host,on_message=on_message,on_error=on_error,on_close=on_close) ws.on_open = on_open ws.run_forever()

如果想要通信一條短消息,并在完成后立即斷開(kāi)連接,我們可以使用短連接:

from websocket import create_connectionws = create_connection('ws://echo.websocket.org/')print('Sending ’Hello, World’...')ws.send('Hello, World')print('Sent')print('Receiving...')result = ws.recv()print('Received ’%s’' % result)ws.close()

關(guān)于websocket的介紹就到這兒了。

以上就是用Python進(jìn)行websocket接口測(cè)試的詳細(xì)內(nèi)容,更多關(guān)于python 接口測(cè)試的資料請(qǐng)關(guān)注好吧啦網(wǎng)其它相關(guān)文章!

標(biāo)簽: Python 編程
相關(guān)文章:
主站蜘蛛池模板: 欧美激情自拍 | 国产精选莉莉私人影院 | 国产成人免费高清视频 | 欧美操操操| 黄色亚洲网站 | a级国产乱理伦片在线观看国 | 欧美操操操 | 天天干亚洲 | 久久久久成人精品一区二区 | 免费高清在线爱做视频 | 久久久精品久久久久久久久久久 | 91成人在线播放 | 国产三级a三级三级 | 91p在线 | 成 人 亚洲 综合天堂 | 欧美三级不卡视频 | 九九视频国产 | 国产日韩精品一区二区三区 | 美女脱了内裤张开腿让男人桶网站 | 一级毛片免费观看 | 在线免费观看日本视频 | 97精品在线视频 | 韩国三级日本三级香港三级黄 | 亚洲区免费| 久久99精品国产免费观看 | 亚洲一区二区精品 | 精品国产乱码久久久久久一区二区 | 日韩亚洲欧美综合一区二区三区 | 分享一个无毒不卡免费国产 | 国产成人精品综合在线 | 国产区91 | 91色老99久久九九爱精品 | 午夜视频在线观看一区二区 | a一级免费| 欧美亚洲国产精品久久高清 | 国产欧美日韩三级 | 97在线看| 手机在线毛片免费播放 | 精品国产欧美一区二区五十路 | 90岁老太婆一级毛片 | 欧美一级专区免费大片 |