欧姆龙CIP协议-Python client上位机
使用Python进行测试。这里简单写了一个注册-读取数据-注销的Python代码。在调试过程中,结合上文的状态码进行调试。
import socket
import struct
class EtherNetIPClient:
def __init__(self, host='192.168.1.5', port=44818):
self.host = host
self.port = port
self.handle = None
# self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def connect(self):
self.client_socket.connect((self.host, self.port))
print(f"成功连接到 {self.host}:{self.port}")
def register(self):
# 注册报文
register_message = bytes.fromhex(
'65 00 04 00 00 00 00 00 00 00 00 00 ff ab cd ff ff dc ba ff 00 00 00 00 01 00 00 00'
)
# 发送注册请求
self.client_socket.sendall(register_message)
print("已发送注册请求:", register_message.hex())
# 接收注册响应
response = self.client_socket.recv(1024)
print("接收到的注册响应:", response.hex())
# 提取句柄
if len(response) >= 8:
handle = response[4:8].hex() # 句柄为第5到第8字节
print("保存的句柄:", handle)
self.handle = handle
return handle # 返回句柄
return None
def unregister(self):
unregister_message = bytes.fromhex(
'66 00 00 00 57 01 56 00 00 00 00 00 ff ab cd ff ff dc ba ff 00 00 00 00'
)
unregister_message = bytearray(unregister_message)
# 将句柄插入到读取请求中
unregister_message[4:8] = bytes.fromhex(self.handle)
# 发送注册请求
self.client_socket.sendall(unregister_message)
print("已发送注销请求:", unregister_message.hex())
pass
def close(self):
self.client_socket.close()
print("client连接已关闭")
def parse_response(self,response):
# 确保响应至少有 4 个字节来读取长度字段
if len(response) < 4:
raise ValueError("Response is too short to even contain the length.")
# 第一步:读取前 4 个字节,获取总长度字段
header_length = struct.unpack(' # 检查是否接收到了足够的数据 if len(response) < header_length: raise ValueError(f"Response length {len(response)} is shorter than the expected total length: {header_length}.") # 第二步:读取剩下的 header,header 长度为 header_length rest_header_length = header_length - 4 # header_length 是总长度减去前面已经读取的 4 个字节 header = response[4:4 + rest_header_length] # 读取 header 数据 # 第三步:读取剩余的 Command Specific Data 部分 command_specific_data = response[header_length:] # 剩下的数据都是 command specific data # 现在我们需要解析 header 和 command specific data # 假设 header 的格式为 if len(header) >= 12: command, session_handle, status, options = struct.unpack(' else: raise ValueError("Header is too short to extract all fields.") # 将结果组织为字典 parsed_data = { 'Total Length': header_length, 'Header Length': rest_header_length, 'Command': command, 'Session Handle': session_handle, 'Status': status, 'Options': options, 'Command Specific Data': command_specific_data } return parsed_data def read_message(self, tag): # 封装读取报文,使用句柄和标签 header_bytes = bytearray([ # 前24字节是header 0x6F, 0x00, # A 读的报头,固定 0x18,0x00, # B command specific data + cip报文的长度,从G到S的长度(字节) 0x70, 0x01, 0x27, 0x00, # C 会话句柄 0x00, 0x00, 0x00, 0x00, # D 状态,固定 0xff, 0xab, 0xcd, 0xff,0xff, 0xdc, 0xba, 0xff, # E 发送方描述,固定 0x00, 0x00, 0x00, 0x00, # F 选项,固定 ]) command_specific_data_bytes = bytearray([ # 以下为 Command Specific Data 0x00,0x00,0x00,0x00, # G 接口句柄,固定 0x00, 0x00, # H 超时,固定 0x02, 0x00, # I 项数,固定 0x00, 0x00, # J 空地址项,固定 0x00, 0x00, # K 空地址项长度,固定 0xB2, 0x00, # L 未连接项,固定 0x08, 0x00, # M 后面CIP报文包的长度 ]) CIP_bytes = bytearray([ # 以下是CIP报文 0x4C, # N 服务标识,固定 0x00, # O 从P到R字长度(1字是4字节,四个0x00) 0x91, # P 扩展符号,固定 0x01, # Q 数据大小,就是tag字符串转byte的长度(字) # 0x00, 0x00, # R 数据名称,就是tag字符串,长度为Q # 0x01, 0x00, # S 服务命令指定数据,固定 ]) # 将句柄插入到读取请求中 header_bytes[4:8] = bytes.fromhex(self.handle) # 插入标签字节 # len(hex_tag) 是字节长度的 2 倍,除以 2 得到字节数 hex_tag = tag.encode('UTF-8').hex() # 将标签字符串转换为16进制 # 检查标签长度是否为奇数 if len(tag) % 2 != 0: hex_tag += '00' # 补充 '00' # 以alarm1为例,现在hex_tag为0791616c61726d31,length_hex为04(字) length_hex = '{:02x}'.format(len(hex_tag) // 4+1) # 将字符串长度添加到 request_message # 插入长度部分(必须是两位十六进制) CIP_bytes[1:2]= bytes.fromhex(length_hex) CIP_bytes[3:4]= bytes.fromhex('{:02x}'.format(len(hex_tag) // 2 if len(tag)% 2 ==0 else len(hex_tag) // 2 -1)) # tag的长度 # 插入 hex_tag 本身 CIP_bytes += bytes.fromhex(hex_tag) # 插入固定的服务命令指定数据 CIP_bytes += bytes.fromhex('0100') # 计算CIP报文长度 # CIP_length = len(hex_tag) // 2 + 4 CIP_length_hex = struct.pack(' # print("CIP_length_hex:", CIP_length_hex) command_specific_data_bytes[14:16] = bytes.fromhex(CIP_length_hex) # 计算command specific data的长度 command_specific_data_length = len(command_specific_data_bytes)+len(CIP_bytes) command_specific_data_length_hex = struct.pack(' # print("command_specific_data_length_hex:", command_specific_data_length_hex) header_bytes[2:4] = bytes.fromhex(command_specific_data_length_hex) print("header_bytes:", header_bytes.hex()) print("command_specific_data_bytes:", command_specific_data_bytes.hex()) print("CIP_bytes:", CIP_bytes.hex()) request_message = header_bytes + command_specific_data_bytes + CIP_bytes # 发送读取请求 self.client_socket.sendall(request_message) print("已发送读取请求:", request_message.hex()) # 接收响应 response = self.client_socket.recv(1024) print("接收到的响应:", response.hex()) # print(self.parse_response(response)) # # 解析返回报文 # command_code = response[0:2].hex() # length = response[2:4].hex() # session_handle = response[4:8].hex() # session_status = response[8:12].hex() # item_count = response[16:18].hex() # data_type = response[-4:-2].hex() # 数据类型 # data_value = response[-2:].hex() # 数据值 # print(f"命令码: {command_code}") # print(f"长度: {length}") # print(f"会话句柄: {session_handle}") # print(f"会话状态: {session_status}") # print(f"项数: {item_count}") # print(f"数据类型: {data_type}") # print(f"数据值: {data_value}") if __name__ == "__main__": client = EtherNetIPClient() client.connect() # 先连接 client.register() # 注册 if client.handle: # tag_name = "PB_ALARM_RST" # ok tag_name = "IND_MODE" # ok # tag_name = "AL.B[1]" # 指定标签名称 client.read_message( tag_name) # 使用句柄和标签读取数据 client.unregister() client.close() # 最后关闭连接