欧姆龙CIP协议-Python client上位机

使用Python进行测试。这里简单写了一个注册-读取数据-注销的Python代码。在调试过程中,结合上文的状态码进行调试。

import socket

import struct

class EtherNetIPClient:

def __init__(self, host='192.168.1.5', port=44818):

self.host = host

self.port = port

self.handle = None

# self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

def connect(self):

self.client_socket.connect((self.host, self.port))

print(f"成功连接到 {self.host}:{self.port}")

def register(self):

# 注册报文

register_message = bytes.fromhex(

'65 00 04 00 00 00 00 00 00 00 00 00 ff ab cd ff ff dc ba ff 00 00 00 00 01 00 00 00'

)

# 发送注册请求

self.client_socket.sendall(register_message)

print("已发送注册请求:", register_message.hex())

# 接收注册响应

response = self.client_socket.recv(1024)

print("接收到的注册响应:", response.hex())

# 提取句柄

if len(response) >= 8:

handle = response[4:8].hex() # 句柄为第5到第8字节

print("保存的句柄:", handle)

self.handle = handle

return handle # 返回句柄

return None

def unregister(self):

unregister_message = bytes.fromhex(

'66 00 00 00 57 01 56 00 00 00 00 00 ff ab cd ff ff dc ba ff 00 00 00 00'

)

unregister_message = bytearray(unregister_message)

# 将句柄插入到读取请求中

unregister_message[4:8] = bytes.fromhex(self.handle)

# 发送注册请求

self.client_socket.sendall(unregister_message)

print("已发送注销请求:", unregister_message.hex())

pass

def close(self):

self.client_socket.close()

print("client连接已关闭")

def parse_response(self,response):

# 确保响应至少有 4 个字节来读取长度字段

if len(response) < 4:

raise ValueError("Response is too short to even contain the length.")

# 第一步:读取前 4 个字节,获取总长度字段

header_length = struct.unpack('

# 检查是否接收到了足够的数据

if len(response) < header_length:

raise ValueError(f"Response length {len(response)} is shorter than the expected total length: {header_length}.")

# 第二步:读取剩下的 header,header 长度为 header_length

rest_header_length = header_length - 4 # header_length 是总长度减去前面已经读取的 4 个字节

header = response[4:4 + rest_header_length] # 读取 header 数据

# 第三步:读取剩余的 Command Specific Data 部分

command_specific_data = response[header_length:] # 剩下的数据都是 command specific data

# 现在我们需要解析 header 和 command specific data

# 假设 header 的格式为 (这是个示例)

if len(header) >= 12:

command, session_handle, status, options = struct.unpack('

else:

raise ValueError("Header is too short to extract all fields.")

# 将结果组织为字典

parsed_data = {

'Total Length': header_length,

'Header Length': rest_header_length,

'Command': command,

'Session Handle': session_handle,

'Status': status,

'Options': options,

'Command Specific Data': command_specific_data

}

return parsed_data

def read_message(self, tag):

# 封装读取报文,使用句柄和标签

header_bytes = bytearray([

# 前24字节是header

0x6F, 0x00, # A 读的报头,固定

0x18,0x00, # B command specific data + cip报文的长度,从G到S的长度(字节)

0x70, 0x01, 0x27, 0x00, # C 会话句柄

0x00, 0x00, 0x00, 0x00, # D 状态,固定

0xff, 0xab, 0xcd, 0xff,0xff, 0xdc, 0xba, 0xff, # E 发送方描述,固定

0x00, 0x00, 0x00, 0x00, # F 选项,固定

])

command_specific_data_bytes = bytearray([

# 以下为 Command Specific Data

0x00,0x00,0x00,0x00, # G 接口句柄,固定

0x00, 0x00, # H 超时,固定

0x02, 0x00, # I 项数,固定

0x00, 0x00, # J 空地址项,固定

0x00, 0x00, # K 空地址项长度,固定

0xB2, 0x00, # L 未连接项,固定

0x08, 0x00, # M 后面CIP报文包的长度

])

CIP_bytes = bytearray([

# 以下是CIP报文

0x4C, # N 服务标识,固定

0x00, # O 从P到R字长度(1字是4字节,四个0x00)

0x91, # P 扩展符号,固定

0x01, # Q 数据大小,就是tag字符串转byte的长度(字)

# 0x00, 0x00, # R 数据名称,就是tag字符串,长度为Q

# 0x01, 0x00, # S 服务命令指定数据,固定

])

# 将句柄插入到读取请求中

header_bytes[4:8] = bytes.fromhex(self.handle)

# 插入标签字节

# len(hex_tag) 是字节长度的 2 倍,除以 2 得到字节数

hex_tag = tag.encode('UTF-8').hex() # 将标签字符串转换为16进制

# 检查标签长度是否为奇数

if len(tag) % 2 != 0:

hex_tag += '00' # 补充 '00'

# 以alarm1为例,现在hex_tag为0791616c61726d31,length_hex为04(字)

length_hex = '{:02x}'.format(len(hex_tag) // 4+1)

# 将字符串长度添加到 request_message

# 插入长度部分(必须是两位十六进制)

CIP_bytes[1:2]= bytes.fromhex(length_hex)

CIP_bytes[3:4]= bytes.fromhex('{:02x}'.format(len(hex_tag) // 2 if len(tag)% 2 ==0 else len(hex_tag) // 2 -1)) # tag的长度

# 插入 hex_tag 本身

CIP_bytes += bytes.fromhex(hex_tag)

# 插入固定的服务命令指定数据

CIP_bytes += bytes.fromhex('0100')

# 计算CIP报文长度

# CIP_length = len(hex_tag) // 2 + 4

CIP_length_hex = struct.pack('

# print("CIP_length_hex:", CIP_length_hex)

command_specific_data_bytes[14:16] = bytes.fromhex(CIP_length_hex)

# 计算command specific data的长度

command_specific_data_length = len(command_specific_data_bytes)+len(CIP_bytes)

command_specific_data_length_hex = struct.pack('

# print("command_specific_data_length_hex:", command_specific_data_length_hex)

header_bytes[2:4] = bytes.fromhex(command_specific_data_length_hex)

print("header_bytes:", header_bytes.hex())

print("command_specific_data_bytes:", command_specific_data_bytes.hex())

print("CIP_bytes:", CIP_bytes.hex())

request_message = header_bytes + command_specific_data_bytes + CIP_bytes

# 发送读取请求

self.client_socket.sendall(request_message)

print("已发送读取请求:", request_message.hex())

# 接收响应

response = self.client_socket.recv(1024)

print("接收到的响应:", response.hex())

# print(self.parse_response(response))

# # 解析返回报文

# command_code = response[0:2].hex()

# length = response[2:4].hex()

# session_handle = response[4:8].hex()

# session_status = response[8:12].hex()

# item_count = response[16:18].hex()

# data_type = response[-4:-2].hex() # 数据类型

# data_value = response[-2:].hex() # 数据值

# print(f"命令码: {command_code}")

# print(f"长度: {length}")

# print(f"会话句柄: {session_handle}")

# print(f"会话状态: {session_status}")

# print(f"项数: {item_count}")

# print(f"数据类型: {data_type}")

# print(f"数据值: {data_value}")

if __name__ == "__main__":

client = EtherNetIPClient()

client.connect() # 先连接

client.register() # 注册

if client.handle:

# tag_name = "PB_ALARM_RST" # ok

tag_name = "IND_MODE" # ok

# tag_name = "AL.B[1]" # 指定标签名称

client.read_message( tag_name) # 使用句柄和标签读取数据

client.unregister()

client.close() # 最后关闭连接