笔者最近在测试星火大模型的时候,他们是使用websocket 来建立对话,而且星火大模型开放的测试代码,质量上不咋地(20231030记录),还需要对websocket有一定的了解,才适合自己微调。
pip install websocket
pip install websocket-client
- 1 常见的websocket获取数据的方法
-
- 2 针对`run_forever`内容保存
-
参考【python: websocket获取实时数据的几种常见链接方式】常见的两种。
需要pip install websocket-client (此方法不建议使用,链接不稳定,容易断,并且连接很耗时)
import time
from websocket import create_connection
url = 'wss://i.cg.net/wi/ws'
while True: # 一直链接,直到连接上就退出循环
time.sleep(2)
ws = create_connection(url)
print(ws)
break
except Exception as e:
print('连接异常:', e)
continue
while True: # 连接上,退出第一个循环之后,此循环用于一直获取数据
ws.send('{"event":"subscribe", "channel":"btc_usdt.ticker"}')
response = ws.recv()
print(response)
import websocket
def on_message(ws, message): # 服务器有数据更新时,主动推送过来的数据
print(message)
def on_error(ws, error): # 程序报错时,就会触发on_error事件
print(error)
def on_close(ws):
print("Connection closed ……")
def on_open(ws): # 连接到服务器之后就会触发on_open事件,这里用于send数据
req = '{"event":"subscribe", "channel":"btc_usdt.deep"}'
print(req)
ws.send(req)
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp("wss://i.cg.net/wi/ws",
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.on_open = on_open
ws.run_forever(ping_timeout=30)
第二种方式里面,run_forever
其实是流式返回内容,大概可以看,流式输出的样例:
{"code":0,"sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":0}
### error: 'content'
{"code":0,"fileRefer":"{\"43816997a7a44a299d0bfb7c360c5838\":[2,0,1]}","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":99}
### error: 'content'
{"code":0,"content":"橘","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":1}
橘{"code":0,"content":"子。","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":1}
子。{"code":0,"content":"","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":2}
### closed ###
那么run_forever
流式输出,正常的内容如何保存呢,进入下一章
参考【将Websocket数据保存到Pandas】
来看一下,文中的案例:
import json
import pandas as pd
import websocket
df = pd.DataFrame(columns=['foreignNotional', 'grossValue', 'homeNotional', 'price', 'side',
'size', 'symbol', 'tickDirection', 'timestamp', 'trdMatchID'])
def on_message(ws, message):
msg = json.loads(message)
print(msg)
global df
# `ignore_index=True` has to be provided, otherwise you'll get
# "Can only append a Series if ignore_index=True or if the Series has a name" errors
df = df.append(msg, ignore_index=True)
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
return
if __name__ == "__main__":
ws = websocket.WebSocketApp("wss://www.bitmex.com/realtime?subscribe=trade:XBTUSD",
on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close)
ws.run_forever()
其中global df
是在定义全局变量df
,可以在函数中把流式数据拿出来,还是很不错的
在开源项目中ChuanhuChatGPT,看到了使用的方式spark.py,个人还没有尝试,只是贴在这里。
贴一下这个函数:
class CallbackToIterator:
def __init__(self):
self.queue = deque()
self.cond = Condition()
self.finished = False
def callback(self, result):
with self.cond:
self.queue.append(result)
self.cond.notify() # Wake up the generator.
def __iter__(self):
return self
def __next__(self):
with self.cond:
# Wait for a value to be added to the queue.
while not self.queue and not self.finished:
self.cond.wait()
if not self.queue:
raise StopIteration()
return self.queue.popleft()
def finish(self):
with self.cond:
self.finished = True
self.cond.notify() # Wake up the generator if it's waiting.
# 主函数截取
def get_answer_stream_iter(self):
wsParam = Ws_Param(self.appid, self.api_key, self.api_secret, self.spark_url)
websocket.enableTrace(False)
wsUrl = wsParam.create_url()
ws = websocket.WebSocketApp(
wsUrl,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open,
ws.appid = self.appid
ws.domain = self.domain
# Initialize the CallbackToIterator
ws.iterator = CallbackToIterator()
# Start the WebSocket connection in a separate thread
thread.start_new_thread(
ws.run_forever, (), {"sslopt": {"cert_reqs": ssl.CERT_NONE}}
# Iterate over the CallbackToIterator instance
answer = ""
total_tokens = 0
for message in ws.iterator:
data = json.loads(message)
code = data["header"]["code"]
if code != 0:
ws.close()
raise Exception(f"请求错误: {code}, {data}")
else:
choices = data["payload"]["choices"]
status = choices["status"]
content = choices["text"][0]["content"]
if "usage" in data["payload"]:
total_tokens = data["payload"]["usage"]["text"]["total_tokens"]
answer += content
if status == 2:
ws.iterator.finish() # Finish the iterator when the status is 2
ws.close()
yield answer, total_tokens
截取了部分代码,这里先是定义ws.iterator = CallbackToIterator()
然后通过迭代从for message in ws.iterator:
拿出数据,看上去也是可行的
使用websocket的Python到NANO网络API。
用法(简单)
在nanoAddress字段中输入要监视的地址。 该程序将收听RECEIVE付款。 您可以将其设置为收听RECIEVE,SEND,甚至两者都收听! 在while循环之后,只需在websocket接收命令之后更改逻辑即可!
用法(基于类)
要开始,只需将您的nano地址放入Data.json文件中并运行主文件。 该脚本是使用类,获取器和设置器构造的。 如果您喜欢这种方法,请更多使用此脚本。
我需要什么?
下载main.py文件和nodes.json文件。
在里面更改地址。
确保您点安装了requirements.txt文件(或手动安装)
Python库:
网络套接字
有关完整信息。 看看requests.txt!
第一种, 使用create_connection链接,需要pip install websocket-client (此方法不建议使用,链接不稳定,容易断,并且连接很耗时)
import time
from websocket import create_connection
url = 'wss://i.cg.net/wi/ws'
while True: # 一直链接,直到连接上就退出循环
time.sleep(2)
ws = create_connection(url)
print(ws)
break
except Exception as
____tz_zs
websocket 库在 0.48.0 版本后对回调进行了修改。
新版本中,当我们将一个实例对象的方法作为 WebSocketApp 的回调时,WebSocketApp 将不再会返回他自己作为回调的第一个参数。
普通方法作为 WebSocketApp 回调
以下为官方示例(https://pypi.org/project/websocket_client/)的长连接用法 Lon...
要在 FastAPI 中使用 WebSocket,需要使用 FastAPI 的 WebSocketEndpoint 类,该类是 WebSocket 协议的一个实现。以下是一个使用 FastAPI WebSocket 的简单示例:
```python
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"You said: {data}")
在上面的代码中,定义了一个 WebSocket 端点 `/ws`,并使用 `async def` 定义了一个异步函数 `websocket_endpoint`。该函数的形参是一个 WebSocket,它将在连接建立时自动创建。
使用 `await websocket.accept()` 方法来接受 WebSocket 的连接请求,并在之后的循环中等待从客户端接收到的数据。使用 `await websocket.receive_text()` 方法接收数据,并使用 `await websocket.send_text()` 方法向客户端发送响应。
可以使用 Python 的 WebSocket 客户端来测试这个 WebSocket 端点,比如使用 `websockets` 库:
```python
import asyncio
import websockets
async def send_message():
async with websockets.connect('ws://localhost:8000/ws') as websocket:
await websocket.send('Hello')
response = await websocket.recv()
print(response)
asyncio.get_event_loop().run_until_complete(send_message())
在上面的代码中,使用 `websockets.connect()` 方法连接到 WebSocket 端点,发送一个消息,并等待回复。最后,使用 `print()` 方法打印回复。
losectrl:
python︱写markdown一样写网页,代码快速生成web工具:streamlit介绍(一)