import serial
from serial import threaded
import time
import numpy as np
from enum import Enum
import traceback
import csv
"""
A python module to control the CubeMars AK series actuators over the serial port.
"""
Servo_Params_Serial = {
'AK80-9': {
'Type' : 'AK80-9', # name of motor to print out in diagnostics
'P_min' : -58.85, # rad (-58.85 rad limit)
'P_max' : 58.85, # rad (58.85 rad limit)
'V_min' : -20.0, # rad/s (-58.18 rad/s limit)
'V_max' : 20.0, # rad/s (58.18 rad/s limit)
'Curr_min' : -15.0,# A (-60A is the acutal limit)
'Curr_max' : 15.0, # A (60A is the acutal limit)
'Temp_max' : 40.0, # max mosfet temp in deg C
'Kt': 0.115, # Nm before gearbox per A of qaxis current
'GEAR_RATIO': 9, # 9:1 gear ratio
'NUM_POLE_PAIRS' : 21 # 21 pole pairs
}
}
"""
Dictionary with the default parameters for the motors, indexed by their name
Parameters:
Type (str): the name of this type of motor (ie, AK##-##)
P_min (float): Minimum position in radians
P_max (float): Maximum position in radians
V_min (float): Minimum speed command in velocity control in rad/s
V_max (float): Maximum speed command in velocity control in rad/s
Curr_min (float): Minimum current command during current control in A
Curr_max (float): Maximum current command during current control in A
Temp_max (float): Temperature above which motor should be turned off in Celsius
Kt (float): Torque constant of motor in Nm/A, before gearbox, q-axis
GEAR_RATIO (int): The gear ratio of the motor "n" as in n:1
NUM_POLE_PAIRS (int): Number of pole pairs in the motor
"""
crc16_tab = [0x0000, 0x1021, 0x2042,0x3063, 0x4084,
0x50a5, 0x60c6, 0x70e7, 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c,0xd1ad,
0xe1ce, 0xf1ef, 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294,0x72f7,
0x62d6, 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff,0xe3de,
0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,0xa56a,
0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d, 0x3653,0x2672,
0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4, 0xb75b, 0xa77a,0x9719,
0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc, 0x48c4, 0x58e5, 0x6886,0x78a7,
0x0840, 0x1861, 0x2802, 0x3823, 0xc9cc, 0xd9ed, 0xe98e, 0xf9af,0x8948,
0x9969, 0xa90a, 0xb92b, 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71,0x0a50,
0x3a33, 0x2a12, 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58,0xbb3b,
0xab1a, 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60,0x1c41,
0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,0x7e97,
0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70, 0xff9f,0xefbe,
0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78, 0x9188, 0x81a9,0xb1ca,
0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f, 0x1080, 0x00a1, 0x30c2,0x20e3,
0x5004, 0x4025, 0x7046, 0x6067, 0x83b9, 0x9398, 0xa3fb, 0xb3da,0xc33d,
0xd31c, 0xe37f, 0xf35e, 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235,0x5214,
0x6277, 0x7256, 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f,0xd52c,
0xc50d, 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424,0x4405,
0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,0x26d3,
0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634, 0xd94c,0xc96d,
0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab, 0x5844, 0x4865,0x7806,
0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3, 0xcb7d, 0xdb5c, 0xeb3f,0xfb1e,
0x8bf9, 0x9bd8, 0xabbb, 0xbb9a, 0x4a75, 0x5a54, 0x6a37, 0x7a16,0x0af1,
0x1ad0, 0x2ab3, 0x3a92, 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa,0xad8b,
0x9de8, 0x8dc9, 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83,0x1ce0,
0x0cc1, 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9,0x9ff8,
0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0]
"""
CRC 16 table as given in the manual, to be used to verify the packets recived
"""
PARAMETER_FLAGS = {
# parameter : flag
'MOSFET_TEMP' : 1,
'MOTOR_TEMP' : 1 << 2,
'OUTPUT_CURRENT' : 1 << 3,
'INPUT_CURRENT' : 1 << 4,
'D_CURRENT' : 1 << 5,
'Q_CURRENT' : 1 << 6,
'DUTY_CYCLE' : 1 << 7,
'MOTOR_SPEED' : 1 << 8,
'INPUT_VOLTAGE' : 1 << 9,
'MOTOR_ERROR_FLAG': 1 << 16,
'MOTOR_POSITION' : 1 << 17,
'MOTOR_ID' : 1 << 18
}
"""
A dictionary of flags, where 1 means enable the desired parameter during feedback, and 0 means disable.
"""
ERROR_CODES = {
0 : 'FAULT_CODE_NONE',
1 : 'FAULT_CODE_OVER_VOLTAGE',
2 : 'FAULT_CODE_UNDER_VOLTAGE',
3 : 'FAULT_CODE_DRIVE',
4 : 'FAULT_CODE_ABS_OVER_CURRENT',
5 : 'FAULT_CODE_OVER_TEMP_FET',
6 : 'FAULT_CODE_OVER_TEMP_MOTOR',
7 : 'FAULT_CODE_GATE_DRIVER_OVER_VOLTAGE',
8 : 'FAULT_CODE_GATE_DRIVER_UNDER_VOLTAGE',
9 : 'FAULT_CODE_MCU_UNDER_VOLTAGE',
10 : 'FAULT_CODE_BOOTING_FROM_WATCHDOG_RESET',
11 : 'FAULT_CODE_ENCODER_SPI',
12 : 'FAULT_CODE_ENCODER_SINCOS_BELOW_MIN_AMPLITUDE',
13 : 'FAULT_CODE_ENCODER_SINCOS_ABOVE_MAX_AMPLITUDE',
14 : 'FAULT_CODE_FLASH_CORRUPTION',
15 : 'FAULT_CODE_HIGH_OFFSET_CURRENT_SENSOR_1',
16 : 'FAULT_CODE_HIGH_OFFSET_CURRENT_SENSOR_2',
17 : 'FAULT_CODE_HIGH_OFFSET_CURRENT_SENSOR_3',
18 : 'FAULT_CODE_UNBALANCED_CURRENTS',
}
"""
A dictionary mapping error code numbers to the name of the error code
"""
[docs]class COMM_PACKET_ID():
"""
Basically used as ENUM to identify what each packet ID does
"""
COMM_FW_VERSION = 0
COMM_JUMP_TO_BOOTLOADER = 1
COMM_ERASE_NEW_APP = 2
COMM_WRITE_NEW_APP_DATA = 3
COMM_GET_VALUES = 4
COMM_SET_DUTY = 5
COMM_SET_CURRENT = 6
COMM_SET_CURRENT_BRAKE = 7
COMM_SET_RPM = 8
COMM_SET_POS = 9
COMM_SET_HANDBRAKE = 10
COMM_SET_DETECT = 11
# COMM_GET_POS = 16
COMM_ROTOR_POSITION = 22
COMM_GET_VALUES_SETUP = 50
COMM_SET_POS_SPD = 91
COMM_SET_POS_MULTI = 92
COMM_SET_POS_SINGLE = 93
COMM_SET_POS_UNLIMITED = 94
COMM_SET_POS_ORIGIN = 95
[docs]def buffer_append_int16(buffer, number):
"""
split a 16 bit signed integer into 2 bytes and append to buffer.
Args:
Buffer: memory allocated to store data.
number: value.
"""
buffer.append((number >> 8)&(0x00FF))
buffer.append((number)&(0x00FF))
# Buffer allocation for unsigned 16 bit
[docs]def buffer_append_uint16( buffer,number):
"""
split a 16 bit unsigned integer into 2 bytes and append to buffer.
Args:
Buffer: memory allocated to store data.
number: value.
"""
buffer.append((number >> 8)&(0x00FF))
buffer.append((number)&(0x00FF))
# Buffer allocation for 32 bit
[docs]def buffer_append_int32(buffer,number):
"""
split a 32 bit signed integer into 4 bytes and append to buffer.
Args:
Buffer: memory allocated to store data.
number: value.
"""
buffer.append((number >> 24)&(0x000000FF))
buffer.append((number >> 16)&(0x000000FF))
buffer.append((number >> 8)&(0x000000FF))
buffer.append((number)&(0x000000FF))
# Buffer allocation for 32 bit
[docs]def buffer_append_uint32( buffer,number):
"""
split a 32 bit unsigned integer into 4 bytes and append to buffer.
Args:
Buffer: memory allocated to store data.
number: value.
"""
buffer.append((number >> 24)&(0x000000FF))
buffer.append((number >> 16)&(0x000000FF))
buffer.append((number >> 8)&(0x000000FF))
buffer.append((number)&(0x000000FF))
# Buffer allocation for 64 bit
[docs]def buffer_append_int64( buffer,number):
"""
split a 64 bit signed integer into 8 bytes and append to buffer.
Args:
Buffer: memory allocated to store data.
number: value.
"""
buffer.append((number >> 56)&(0x00000000000000FF))
buffer.append((number >> 48)&(0x00000000000000FF))
buffer.append((number >> 40)&(0x00000000000000FF))
buffer.append((number >> 31)&(0x00000000000000FF))
buffer.append((number >> 24)&(0x00000000000000FF))
buffer.append((number >> 16)&(0x00000000000000FF))
buffer.append((number >> 8)&(0x00000000000000FF))
buffer.append((number)&(0x00000000000000FF))
# Buffer allocation for Unsigned 64 bit
[docs]def buffer_append_uint64(buffer,number):
"""
split a 64 bit unsigned integer into 8 bytes and append to buffer.
Args:
Buffer: memory allocated to store data.
number: value.
"""
buffer.append((number >> 56)&(0x00000000000000FF))
buffer.append((number >> 48)&(0x00000000000000FF))
buffer.append((number >> 40)&(0x00000000000000FF))
buffer.append((number >> 31)&(0x00000000000000FF))
buffer.append((number >> 24)&(0x00000000000000FF))
buffer.append((number >> 16)&(0x00000000000000FF))
buffer.append((number >> 8)&(0x00000000000000FF))
buffer.append((number)&(0x00000000000000FF))
[docs]def buffer_get_int8(buffer, ind):
"""
Grab the 8 bit integer at data[ind]
Args:
buffer: array with bytes of data
ind: location to index
Returns:
8 bit integer at data[ind]
"""
return np.int8((buffer[ind]))
[docs]def buffer_get_int16(buffer, ind):
"""
Grab the 16 bit integer at data[ind:ind+1]
Args:
buffer: array with bytes of data
ind: location to index
Returns:
16 bit integer at data[ind:ind+1]
"""
return np.int16((np.uint8(buffer[ind]) << 8) | np.uint8(buffer[ind+1]))
[docs]def buffer_get_int32(buffer, ind):
"""
Grab the 32 bit integer at data[ind:ind+3]
Args:
buffer: array with bytes of data
ind: location to index
Returns:
32-bit signed integer at data[ind:ind+3]
"""
return np.int32((np.uint8(buffer[ind]) << 24) | (np.uint8(buffer[ind+1]) << 16) | (np.uint8(buffer[ind+2]) << 8) | np.uint8(buffer[ind+3]))
[docs]def crc16(data, DL):
"""
Calculate the crc16 value for the given data array and data length.
This is just translated to python from the C code in the manual on the cubmars website.
Args:
data: array of data to creat checksum value for
DL: data length
Returns:
16-bit integer checksum.
"""
cksum = np.uint16(0)
for i in range(DL):
cksum = crc16_tab[((cksum >> 8) ^ (data[i])) & 0xFF] ^ (cksum << 8)
return np.uint16(cksum)
[docs]def create_packet(data):
"""
Packages the data into a packet to be sent.
Args:
data: array of N+1 bytes of data, [packet id, data_1, ..., data_N]
Returns:
packet: array in the format, [0x02, data length, packet id, data_1, ..., data_N, crc_1, crc_2, 0x03]
"""
packet = []
DL = len(data)
if (DL > 256):
raise RuntimeError("Tried to send packet longer than 256 bytes!")
else:
packet.append(0x02)
packet.append(DL)
packet += data
crc = crc16(data, DL)
packet.append(np.uint8(crc >> 8))
packet.append(np.uint8(crc & 0xFF))
packet.append(0x03)
return packet
[docs]def parse_packet(packet):
"""
Check that the packet makes sense, and get the data out of it.
Args:
packet: array in the format [0x02, data length, packet id, data_1, ..., data_N, crc_1, crc_2, 0x03]
Returns:
data: Just the packet id and actual data from the recieved packet, [packet id, data_1, ..., data_N]
None: returns None if the packet failed to successfully parse
"""
if len(packet) > 4:
header = packet[0]
DL = packet[1]
data = packet[2:2+DL]
crc = buffer_get_int16(packet[2+DL:DL+4], 0)
if crc == crc16(data, DL):
return data
else:
return None
else:
return None
[docs]class servo_serial_motor_state:
"""
An object representing the state of the motor
"""
[docs] def __init__(self):
"""
Initialize the motor state to zero.
"""
self.mos_temperature = 0
self.motor_temperature = 0
self.output_current = 0
self.input_current = 0
self.id_current = 0
self.iq_current = 0
self.duty = 0
self.speed = 0
self.input_voltage = 0
self.position_set = 0
self.controlID = 0
self.Vd = 0
self.Vq = 0
self.error = 0
self.acceleration = 0
self.position = 0
[docs] def set_state(self,
mos_temperature = None,
motor_temperature = None,
output_current = None,
input_current = None,
id_current = None,
iq_current = None,
duty = None,
speed = None,
input_voltage = None,
position_set = None,
controlID = None,
Vd = None,
Vq = None,
error = None,
acceleration = None,
position = None):
"""
Set the motor state based on input to the function. If any field is not specified,
then that field will not be altered.
"""
self.mos_temperature = mos_temperature if not (mos_temperature is None) else self.mos_temperature
self.motor_temperature = motor_temperature if not (motor_temperature is None) else self.motor_temperature
self.output_current = output_current if not (output_current is None) else self.output_current
self.input_current = input_current if not (input_current is None) else self.input_current
self.id_current = id_current if not (id_current is None) else self.id_current
self.iq_current = iq_current if not (iq_current is None) else self.iq_current
self.duty = duty if not (duty is None) else self.duty
self.speed = speed if not (speed is None) else self.speed
self.input_voltage = input_voltage if not (input_voltage is None) else self.input_voltage
self.position_set = position_set if not (position_set is None) else self.position_set
self.controlID = controlID if not (controlID is None) else self.controlID
self.Vd = Vd if not (Vd is None) else self.Vd
self.Vq = Vq if not (Vq is None) else self.Vq
self.error = error if not (error is None) else self.error
self.acceleration = acceleration if not (acceleration is None) else self.acceleration
self.position = position if not (position is None) else self.position
def __str__(self):
"""
String showing each of the fields in the motor state.
"""
s = f'Mos Temp: {self.mos_temperature}'
s += f'\nMotor Temp: {self.motor_temperature}'
s += f'\nOutput Current: {self.output_current}'
s += f'\nInput Current: {self.input_current}'
s += f'\nid current: {self.id_current}'
s += f'\niq current: {self.iq_current}'
s += f'\nduty: {self.duty}'
s += f'\nspeed: {self.speed}'
s += f'\ninput voltage: {self.input_voltage}'
s += f'\nposition: {self.position_set}'
s += f'\ncontrolID: {self.controlID}'
s += f'\nVd: {self.Vd}'
s += f'\nVq: {self.Vq}'
s += f'\nAccel: {self.acceleration}'
return s
# possible states for the controller
[docs]class SERVO_SERIAL_CONTROL_STATE(Enum):
"""
An Enum to keep track of different control states
"""
DUTY_CYCLE = 0
CURRENT_LOOP = 1
CURRENT_BRAKE = 2
VELOCITY = 3
POSITION = 4
HANDBRAKE = 5
POSITION_VELOCITY = 6
IDLE = 7
[docs]class motor_listener(serial.threaded.Protocol):
"""
Implements the pyserial "Protocol" class to handle messages asynchronously
TODO when pyserial implmements asyncio support, switch to that
"""
[docs] def connection_made(self, transport):
"""
Could add other things to happen here on connection initialization.
Args:
transport: the connection
"""
super().connection_made(transport)
[docs] def connection_lost(self, transport):
"""
Could add other things to happen here on connection termination.
Args:
transport: the connection
"""
super().connection_made(transport)
[docs] def __init__(self):
"""
Initializes the class, including the state machine variables and a reference
to the motor manager in the main thread that this class will update.
"""
super().__init__()
self.buffer =[]
self.state = 0 # 0 : ready, 1 : message started (0x02), 2 : known DL, 3 : ready data, 5 : expect 0x03
self.DL = 0
self.i = 0
self.motor = None
[docs] def data_received(self, data):
"""
Handle data that's been recieved, by stepping through a state machine one byte at a time.
States:
0: expecting 0x02 for beginning of next message
1: expecting data length field
2: will keep reading data until DL + 2
3: expecting 0x03 for ending of this message
Args:
data: array of received data to parse
"""
# print(f"\n {len(data)}")
for d in data:
if self.state == 0:
if d == 0x02:
self.buffer.append(d)
self.state = 1
elif self.state == 1:
self.buffer.append(d)
self.DL = d
self.state = 2
elif self.state == 2:
self.buffer.append(d)
self.i += 1
if self.i == self.DL + 2:
self.state = 3
self.i = 0
self.DL = 0
elif self.state == 3:
if d == 0x03:
self.buffer.append(d)
self.handle_packet(self.buffer)
self.state = 0
self.buffer = []
[docs] def handle_packet(self, packet):
"""
Called whenever the state machine finishes reading a packet,
to update motor manager object.
Args:
packet: array of data to parse.
"""
if self.motor is not None:
# print(packet)
DL = packet[1]
data = packet[2:2+DL]
crc = buffer_get_int16(packet[2+DL:DL+4], 0)
if crc != crc16(data, DL):
data = None
if not (data is None) and len(data) > 0:
self.motor.update_async(data)
# the user-facing class that manages the motor.
[docs]class TMotorManager_servo_serial():
"""
The user-facing class that manages the motor. This class should be
used in the "context" of a with as block, in order to safely enter/exit
control of the motor.
"""
[docs] def __init__(self, port = '/dev/ttyUSB0', baud=961200, motor_params=Servo_Params_Serial['AK80-9'], max_mosfett_temp = 50,):
"""
Initialize the motor manager. Note that this will not turn on the motor,
until __enter__ is called (automatically called in a with block)
Args:
port: the name of the serial port to connect to (ie, /dev/ttyUSB0, COM3, etc)
baud: the baud rate to use for connection. Should always be 961200 as far as I can tell.
motor_params: A parameter dictionary defining the motor parameters, as defined above.
max_mosfett_temp: Temperature of the mosfett above which to throw an error, in Celsius
"""
self.motor_params = motor_params
self.port = port
self.baud = baud
print("Initializing device: " + self.device_info_string())
self._motor_state = servo_serial_motor_state()
self._motor_state_async = servo_serial_motor_state()
self._command = None # overwrite with byte array of command to send on update()
self._control_state = None
self.max_temp = max_mosfett_temp # max temp in deg C, can update later
# TODO verify these work for other motor types!
self.radps_per_ERPM =2*np.pi/180/60 # 5.82E-04
self.rad_per_Eang = 2*(np.pi/180)/(self.motor_params['NUM_POLE_PAIRS']) # 1.85e-4
self._entered = False
self._start_time = time.time()
self._last_update_time = self._start_time
self._updated_async = False
self._updated = False
self._ser = None
self._reader_thread = None
[docs] def __enter__(self):
"""
Used to safely power the motor on.
"""
if not self._entered:
# begin serial connection
self._ser = serial.Serial(
self.port,
self.baud,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=0.0001,
inter_byte_timeout=0.001
)
# start another thread to handle recieved data
self._reader_thread = serial.threaded.ReaderThread(self._ser, motor_listener)
self._listener = self._reader_thread.__enter__()
self._listener.motor = self
# send startup sequence
self.power_on()
self.send_command()
# tell the motor to send back all parameters
# TODO expand to allow user to only request some data, could be faster
self.set_motor_parameter_return_format_all()
self.send_command()
# tell motor to send current position (for some reason current position is not in the other parameters)
self.begin_position_feedback()
self.send_command()
# don't do anything else
self.enter_idle_mode()
self.send_command()
if not self.check_connection():
print("device: {self.device_info_string()} not connected!")
else:
print(f"device: {self.device_info_string()} successfully connected.")
# return the safely entered motor manager object
self._entered = True
return self
else:
# it's already been entered!
print(f"Already entered device: {self.device_info_string()}")
[docs] def __exit__(self, etype, value, tb):
"""
Used to safely power the motor off and close the reader thread and serial port.
"""
print('\nTurning off control for device: ' + self.device_info_string())
# end reading thread
self._reader_thread.stop()
# power down motor
self._ser.write(self.set_duty_cycle(0.0))
# end serial connection
self._ser.close()
# if this was ended due to an error, print the stack trace
if not (etype is None):
traceback.print_exception(etype, value, tb)
[docs] def check_connection(self):
"""
For now, just sends some parameter read commands and waits 0.2 seconds to
see if we got a response.
"""
self._send_specific_command(self.get_motor_parameters())
self._send_specific_command(self.get_motor_parameters())
self._send_specific_command(self.get_motor_parameters())
# slight delay to ensure connection!
time.sleep(0.2)
return self._updated_async
[docs] def update_async(self, data):
"""
Update the asynchronous motor state. Called by a reader thread.
Args:
data: An array of N bytes of data to parse, [packet id, data_1, ..., data_N]
Raises:
RuntimeError: If the packet recieved contains an error code.
"""
self._updated_async = True
packet_ID = data[0]
if (packet_ID in [COMM_PACKET_ID.COMM_GET_VALUES, COMM_PACKET_ID.COMM_GET_VALUES_SETUP]):
self.parse_motor_parameters_async(data)
# calculate acceleration
now = time.time()
dt = now - self._last_update_time
self._motor_state_async.acceleration = self._motor_state_async.speed/dt
self._last_update_time = now
elif (packet_ID == COMM_PACKET_ID.COMM_ROTOR_POSITION):
self.parse_position_feedback_async(data)
else:
print("weird packet")
# elif (packet_ID == COMM_PACKET_ID.COMM_SET_POS):
# self.parse_set_position_feedback_async(data)
if self._motor_state.error != 0:
raise RuntimeError(ERROR_CODES[self._motor_state.error])
# comm data parsing
[docs] def parse_position_feedback_async(self, data):
"""
Update this motor's asynch position based on recieved data
Args:
data: The data array to parse the position from.
"""
self._motor_state_async.position = -float(buffer_get_int32(data, 1))/1000.0
[docs] def parse_set_position_feedback_async(self, data):
"""
Update this motor's asynch position based on recieved data.
Args:
data: The data array to parse the position from.
"""
# not sure why the constant varies here, maybe the streaming data is more accurate?
self._motor_state_async.position = -float(buffer_get_int32(data, 1))/1000000.0
[docs] def parse_motor_parameters_async(self, data):
"""
Update this motor's asynch state (except position) based on received data
Args:
data: The data array to parse the parameters from
"""
i = 1
self._motor_state_async.mos_temperature = float(buffer_get_int16(data,i))/10.0
i+=2
self._motor_state_async.motor_temperature = float(buffer_get_int16(data,i))/10.0
i+=2
self._motor_state_async.output_current = float(buffer_get_int32(data,i))/100.0
i+=4
self._motor_state_async.input_current = float(buffer_get_int32(data,i))/100.0
i+=4
self._motor_state_async.id_current = float(buffer_get_int32(data,i))/100.0
i+=4
self._motor_state_async.iq_current = float(buffer_get_int32(data,i))/100.0
i+=4
self._motor_state_async.duty = float(buffer_get_int16(data,i))/1000.0
i+=2
self._motor_state_async.speed = float(buffer_get_int32(data,i))
i+=4
self._motor_state_async.input_voltage = float(buffer_get_int16(data,i))/10.0
i+=2 + 24
# TODO investigate what's in the 24 reserved bytes? Maybe it's interesting to record?
self._motor_state_async.error = np.uint(data[i])
i+=1
self._motor_state_async.position_set = float(buffer_get_int32(data,i))/1000000.0
i+=4
self._motor_state_async.controlID = np.uint(data[i])
i+=1 + 6
self._motor_state_async.Vd = float(buffer_get_int32(data,i))/1000.0
i+=4
self._motor_state_async.Vq = float(buffer_get_int32(data,i))/1000.0
i+=4
[docs] def send_command(self):
"""
Sends the current command that the user has specified.
"""
if not (self._command is None):
# use the thread-safe method to write the command, so that we don't access the serial
# object while the reader thread is using it!!
# TODO when pyserial adds full asyncio support, consider switching to that
self._reader_thread.write(self._command)
[docs] def _send_specific_command(self, command):
"""
Sends the specified command rather than the current value of <this object>._command
Args:
command: bytearray with the command to send.
"""
if not (self._command is None):
# use the thread-safe method to write the command, so that we don't access the serial
# object while the reader thread is using it!!
# TODO when pyserial adds full asyncio support, consider switching to that
self._reader_thread.write(command)
[docs] def update(self):
"""
Synchronizes the current motor state with the asynchronously updated state.
Sends the current motor command
Sends the command to get parameter feedback
Raises:
RuntimeError: if this method is called before the motor is entered.
"""
if not self._entered:
raise RuntimeError("Tried to update motor state before safely powering on for device: " + self.device_info_string())
if self.get_temperature_celsius() > self.max_temp:
raise RuntimeError("Temperature greater than {}C for device: {}".format(self.max_temp, self.device_info_string()))
# send the user specified command
self.send_command()
# send the command to get parameters (message will be read in other thread)
self._send_specific_command(self.get_motor_parameters())
# synchronize user-facing state with most recent async state
# TODO implement some filtering on the async state, if it gets multiple updates
# between user requested updates
self._motor_state = self._motor_state_async
# comm protocol commands
[docs] def power_on(self):
"""
Send the startup sequence command. Not sure why it's like this, but the
command is [0x40, 0x80, 0x20, 0x02, 0x21, 0xc0]
"""
self._command = bytearray([0x40, 0x80, 0x20, 0x02, 0x21, 0xc0])
self.send_command()
[docs] def power_off(self):
"""
There is no official power off command that I can see, so this will
set the duty cycle to 0.0
"""
self.set_duty_cycle(0.0)
self.send_command()
[docs] def enter_idle_mode(self):
"""
Set the control state to IDLE and current command to none.
In this mode, no command will be sent.
"""
self._command = None
self._control_state = SERVO_SERIAL_CONTROL_STATE.IDLE
[docs] def enter_velocity_control(self):
"""
Set the control state to VELOCITY
In this mode, you can send a certain motion speed to motor
"""
self._control_state = SERVO_SERIAL_CONTROL_STATE.VELOCITY
[docs] def enter_position_control(self):
"""
Set the control state to POSITION
In this mode, you can send a certain position to motor, the motor will run to the specified
position, (default speed 12000erpm acceleration 40000erpm)
"""
self._control_state = SERVO_SERIAL_CONTROL_STATE.POSITION
[docs] def enter_position_velocity_control(self):
"""
Set the control state to POSITION_VELOCITY
In this mode, you can send a certain position, speed and acceleration to motor.
The motor will run at a given acceleration and maximum speed to a specified
position.
"""
self._control_state = SERVO_SERIAL_CONTROL_STATE.POSITION_VELOCITY
[docs] def enter_current_control(self):
"""
Set the control state to CURENT_LOOP
In this mode, you can send an Iq current to motor, the motor output torque = Iq *KT, so it can
be used as a torque loop
"""
self._control_state = SERVO_SERIAL_CONTROL_STATE.CURRENT_LOOP
[docs] def enter_duty_cycle_control(self):
"""
Set the control state to DUTY_CYCLE
In this mode, you can send a certain duty cycle voltage to motor
"""
self._control_state = SERVO_SERIAL_CONTROL_STATE.DUTY_CYCLE
[docs] def comm_set_duty_cycle(self, duty, set_command=True):
"""
send a certain duty cycle voltage to motor
Args:
duty: -1.0 to 1.0 duty cycle to use
set_command: set the TMotorManager's current command to this if True
Returns:
The command as a bytearray
"""
buffer=[]
buffer_append_int32(buffer, int(duty * 100000.0))
data = [COMM_PACKET_ID.COMM_SET_DUTY] + buffer
if set_command:
self._command = bytearray(create_packet(data))
return self._command
[docs] def comm_set_speed_ERPM(self, speed, set_command=True):
"""
send a certain motion speed to motor
Args:
speed: speed to use in ERPM, sign denotes direction
set_command: set the TMotorManager's current command to this if True
Returns:
The command as a bytearray
"""
buffer=[]
buffer_append_int32(buffer, int(speed))
data = [COMM_PACKET_ID.COMM_SET_RPM] + buffer
if set_command:
self._command = bytearray(create_packet(data))
return self._command
[docs] def comm_set_current_loop(self, current, set_command=True):
"""
send am Iq current to motor, the motor output torque = Iq *KT, so it can
be used as a torque loop
Args:
current: q axis current to use in A, sign denotes direction
set_command: set the TMotorManager's current command to this if True
Returns:
The command as a bytearray
"""
buffer=[]
buffer_append_int32(buffer, int(current*1000.0))
data = [COMM_PACKET_ID.COMM_SET_CURRENT] + buffer
if set_command:
self._command = bytearray(create_packet(data))
return self._command
[docs] def comm_set_position(self, pos, set_command=True):
"""
send a certain position to motor, the motor will run to the specified
position, (default speed 12000erpm acceleration 40000erpm)
Args:
pos: Desired position in degrees
set_command: set the TMotorManager's current command to this if True
Returns:
The command as a bytearray
"""
buffer=[]
buffer_append_int32(buffer, int(pos*1000000))
data = [COMM_PACKET_ID.COMM_SET_POS] + buffer
if set_command:
self._command = bytearray(create_packet(data))
return self._command
[docs] def comm_set_position_velocity(self, pos, vel, acc, set_command=True):
"""
send a certain position, speed and acceleration to motor.
The motor will run at a given acceleration and maximum speed to a specified
position.
Args:
pos: Desired position in degrees
vel: Desired speed in ERPM
acc: Desired acceleration in ERPM/minute? TODO verify this
set_command: set the TMotorManager's current command to this if True
Returns:
The command as a bytearray
"""
# 4 byte pos * 1000, 4 byte vel * 1, 4 byte a * 1
buffer=[]
buffer_append_int32(buffer, int(pos*1000000))
buffer_append_int32(buffer, int(vel))
buffer_append_int32(buffer, int(acc))
data = [COMM_PACKET_ID.COMM_SET_POS_SPD] + buffer
if set_command:
self._command = bytearray(create_packet(data))
return self._command
[docs] def comm_set_multi_turn(self, set_command=True):
"""
Tell the motor to operate in multi-turn mode, rather than being limited to
Just 360 degrees of position feedback.
Args:
set_command: set the TMotorManager's current command to this if True
Returns:
The command as a bytearray
"""
cmd = bytearray([0x02 ,0x05 ,0x5C ,0x00 ,0x00 ,0x00 ,0x00 ,0x9E ,0x19 ,0x03])
if set_command:
self._command = cmd
return cmd
[docs] def set_zero_position(self, set_command=True):
"""
Set the current position of the motor to be the new zero position.
Args:
set_command: set the TMotorManager's current command to this if True
Returns:
The command as a bytearray
"""
cmd = bytearray([0x02, 0x02, 0x5F, 0x01, 0x0E, 0xA0, 0x03])
if set_command:
self._command = bytearray([0x02, 0x02, 0x5F, 0x01, 0x0E, 0xA0, 0x03])
return cmd
[docs] def comm_begin_position_feedback(self, set_command=True):
"""
Tell the motor to send back its current position every 10ms
Args:
set_command: set the TMotorManager's current command to this if True
Returns:
The command as a bytearray
"""
if set_command:
self._command = bytearray([0x02, 0x02, 0x0B, 0x04, 0x9C, 0x7E, 0x03])
[docs] def comm_get_motor_parameters(self, set_command=True):
"""
Request the current motor parameters
Args:
set_command: set the TMotorManager's current command to this if True
Returns:
The command as a bytearray
"""
header = COMM_PACKET_ID.COMM_GET_VALUES
data = [header]
cmd = bytearray(create_packet(data))
if set_command:
self._command = cmd
return cmd
# getters for motor state
[docs] def get_temperature_celsius(self):
"""
Returns:
The most recently updated motor temperature in degrees C.
"""
return self._motor_state.mos_temperature
[docs] def get_motor_error_code(self):
"""
Returns:
The most recently updated motor error code.
Note the program should throw a runtime error before you get a chance to read
this value if it is ever anything besides 0.
Codes:
0 : 'FAULT_CODE_NONE'
1 : 'FAULT_CODE_OVER_VOLTAGE'
2 : 'FAULT_CODE_UNDER_VOLTAGE'
3 : 'FAULT_CODE_DRIVE'
4 : 'FAULT_CODE_ABS_OVER_CURRENT'
5 : 'FAULT_CODE_OVER_TEMP_FET'
6 : 'FAULT_CODE_OVER_TEMP_MOTOR'
7 : 'FAULT_CODE_GATE_DRIVER_OVER_VOLTAGE'
8 : 'FAULT_CODE_GATE_DRIVER_UNDER_VOLTAGE'
9 : 'FAULT_CODE_MCU_UNDER_VOLTAGE'
10 : 'FAULT_CODE_BOOTING_FROM_WATCHDOG_RESET'
11 : 'FAULT_CODE_ENCODER_SPI'
12 : 'FAULT_CODE_ENCODER_SINCOS_BELOW_MIN_AMPLITUDE'
13 : 'FAULT_CODE_ENCODER_SINCOS_ABOVE_MAX_AMPLITUDE'
14 : 'FAULT_CODE_FLASH_CORRUPTION'
15 : 'FAULT_CODE_HIGH_OFFSET_CURRENT_SENSOR_1'
16 : 'FAULT_CODE_HIGH_OFFSET_CURRENT_SENSOR_2'
17 : 'FAULT_CODE_HIGH_OFFSET_CURRENT_SENSOR_3'
18 : 'FAULT_CODE_UNBALANCED_CURRENTS'
"""
return self._motor_state.error
[docs] def get_motor_error_string(self):
return ERROR_CODES[self._motor_state.error]
[docs] def get_current_qaxis_amps(self):
"""
Returns:
The most recently updated qaxis current in amps
"""
return self._motor_state.iq_current
[docs] def get_current_daxis_amps(self):
"""
Returns:
The most recently updated qaxis current in amps
"""
return self._motor_state.id_current
[docs] def get_current_bus_amps(self):
"""
Returns:
The most recently updated qaxis current in amps
"""
return self._motor_state.input_current
[docs] def get_voltage_qaxis_volts(self):
"""
Returns:
The most recently updated qaxis voltage in volts
"""
return self._motor_state.Vq
[docs] def get_voltage_daxis_volts(self):
"""
Returns:
The most recently updated daxis voltage in volts
"""
return self._motor_state.Vd
[docs] def get_voltage_bus_volts(self):
"""
Returns:
The most recently updated input voltage in volts
"""
return self._motor_state.input_current
[docs] def get_output_angle_radians(self):
"""
Returns:
The most recently updated output angle in radians
"""
return self._motor_state.position * self.rad_per_Eang
[docs] def get_output_velocity_radians_per_second(self):
"""
Returns:
The most recently updated output velocity in radians per second
"""
return self._motor_state.speed * self.radps_per_ERPM
[docs] def get_output_acceleration_radians_per_second_squared(self):
"""
Returns:
The most recently updated output acceleration in radians per second per second
"""
return self._motor_state.acceleration * self.radps_per_ERPM
[docs] def get_output_torque_newton_meters(self):
"""
Returns:
the most recently updated output torque in Nm
"""
return self.get_current_qaxis_amps()*self.motor_params["Kt"]*self.motor_params["GEAR_RATIO"]
# user facing setters
[docs] def set_output_velocity_radians_per_second(self, vel):
"""
Update the current command to the desired velocity.
Note, this does not send a command, it updates the TMotorManager's saved command,
which will be sent when update() is called.
Args:
vel: The desired output speed in rad/s
"""
if np.abs(vel) >= self.motor_params["V_max"]:
raise RuntimeError("Cannot control using speed mode for angles with magnitude greater than " + str(self.motor_params["V_max"]) + "rad/s!")
if self._control_state not in [SERVO_SERIAL_CONTROL_STATE.VELOCITY]:
raise RuntimeError("Attempted to send speed command without entering speed control " + self.device_info_string())
self.comm_set_speed_ERPM(vel/self.radps_per_ERPM)
[docs] def set_duty_cycle_percent(self, duty):
"""
Update the current command to the desired duty cycle.
Note, this does not send a command, it updates the TMotorManager's saved command,
which will be sent when update() is called.
Args:
duty: The desired duty cycle -1.0 to 1.0
"""
if np.abs(duty) >= 1:
raise RuntimeError("Cannot control using duty cycle mode for more than 100 percent duty!")
if self._control_state not in [SERVO_SERIAL_CONTROL_STATE.DUTY_CYCLE]:
raise RuntimeError("Attempted to duty cycle command without entering duty cycle control " + self.device_info_string())
self.comm_set_duty_cycle(duty)
[docs] def set_motor_current_qaxis_amps(self, curr):
"""
Update the current command to the desired current.
Note, this does not send a command, it updates the TMotorManager's saved command,
which will be sent when update() is called.
Args:
curr: The desired q-axis current in A
"""
if np.abs(curr) >= self.motor_params["Curr_max"]:
raise RuntimeError("Cannot control using current mode with magnitude greater than " + str(self.motor_params["I_max"]) + "rad/s!")
if self._control_state not in [SERVO_SERIAL_CONTROL_STATE.CURRENT_LOOP]:
raise RuntimeError("Attempted to send current command without entering current control " + self.device_info_string())
self.comm_set_current_loop(curr)
[docs] def set_output_angle_radians(self, pos, vel=0.75, acc=0.5):
"""
Update the current command to the desired position, when in position or position-velocity mode.
Note, this does not send a command, it updates the TMotorManager's saved command,
which will be sent when update() is called.
Args:
pos: The desired output angle in rad
vel: The desired speed to get there in rad/s (when in POSITION_VELOCITY mode)
acc: The desired acceleration to get there in rad/s/s, ish (when in POSITION_VELOCITY mode)
"""
if np.abs(pos) >= self.motor_params["P_max"]:
raise RuntimeError("Cannot control using position mode for angles with magnitude greater than " + str(self.motor_params["P_max"]) + "rad!")
if np.abs(vel) >= self.motor_params["V_max"]:
raise RuntimeError("Cannot control velocities with magnitude greater than " + str(self.motor_params["V_max"]) + "rad/s!")
pos = (pos / self.rad_per_Eang)
vel = (vel / self.radps_per_ERPM)
acc = (acc / self.radps_per_ERPM)
if self._control_state == SERVO_SERIAL_CONTROL_STATE.POSITION_VELOCITY:
self.comm_set_position_velocity(pos, vel, acc)
elif self._control_state == SERVO_SERIAL_CONTROL_STATE.POSITION:
self.comm_set_position(pos)
else:
raise RuntimeError("Attempted to send position command without entering position control " + self.device_info_string())
[docs] def set_output_torque_newton_meters(self, torque):
"""
Update the current command to the desired current, based on the requested torque.
Note, this does not send a command, it updates the TMotorManager's saved command,
which will be sent when update() is called.
Args:
torque: The desired output torque in Nm.
"""
self.set_motor_current_qaxis_amps( (torque/self.motor_params["Kt"]/self.motor_params["GEAR_RATIO"]) )
# motor-side functions to account for the gear ratio
[docs] def set_motor_torque_newton_meters(self, torque):
"""
Version of set_output_torque that accounts for gear ratio to control motor-side torque
Args:
torque: The desired motor-side torque in Nm.
"""
self.set_output_torque_newton_meters(torque*self.motor_params["Kt"])
[docs] def set_motor_angle_radians(self, pos):
"""
Wrapper for set_output_angle that accounts for gear ratio to control motor-side angle
Args:
pos: The desired motor-side position in rad.
"""
self.set_output_angle_radians(pos/(self.motor_params["GEAR_RATIO"]) )
[docs] def set_motor_velocity_radians_per_second(self, vel):
"""
Wrapper for set_output_velocity that accounts for gear ratio to control motor-side velocity
Args:
vel: The desired motor-side velocity in rad/s.
"""
self.set_output_velocity_radians_per_second(vel/(self.motor_params["GEAR_RATIO"]) )
[docs] def get_motor_angle_radians(self):
"""
Wrapper for get_output_angle that accounts for gear ratio to get motor-side angle
Returns:
The most recently updated motor-side angle in rad.
"""
return self._motor_state.position*self.motor_params["GEAR_RATIO"]
[docs] def get_motor_velocity_radians_per_second(self):
"""
Wrapper for get_output_velocity that accounts for gear ratio to get motor-side velocity
Returns:
The most recently updated motor-side velocity in rad/s.
"""
return self._motor_state.velocity*self.motor_params["GEAR_RATIO"]
[docs] def get_motor_acceleration_radians_per_second_squared(self):
"""
Wrapper for get_output_acceleration that accounts for gear ratio to get motor-side acceleration
Returns:
The most recently updated motor-side acceleration in rad/s/s.
"""
return self._motor_state.acceleration*self.motor_params["GEAR_RATIO"]
[docs] def get_motor_torque_newton_meters(self):
"""
Wrapper for get_output_torque that accounts for gear ratio to get motor-side torque
Returns:
The most recently updated motor-side torque in Nm.
"""
return self.get_output_torque_newton_meters()*self.motor_params["GEAR_RATIO"]
# Pretty stuff
def __str__(self):
"""Prints the motor's device info and current state"""
return self.device_info_string() + " | Position: " + '{: 1f}'.format(round(self.get_output_angle_radians(),3)) + " rad | Velocity: " + '{: 1f}'.format(round(self.get_output_velocity_radians_per_second(),3)) + " rad/s | current: " + '{: 1f}'.format(round(self._motor_state.iq_current,3)) + " A | temp: " + '{: 1f}'.format(round(self._motor_state.mos_temperature,0)) + " C" + f' | Error: {self.get_motor_error_string()}'
[docs] def device_info_string(self):
"""Prints the motor's serial port and device type."""
return f"{self.motor_params['Type']} Port: {self.port}"
# controller variables
temperature = property(get_temperature_celsius, doc="temperature_degrees_C")
"""Temperature in Degrees Celsius"""
# TODO write actual codes in description here as well
error = property(get_motor_error_code, doc="Error")
"""Error Codes:
0 : 'FAULT_CODE_NONE'
1 : 'FAULT_CODE_OVER_VOLTAGE'
2 : 'FAULT_CODE_UNDER_VOLTAGE'
3 : 'FAULT_CODE_DRIVE'
4 : 'FAULT_CODE_ABS_OVER_CURRENT'
5 : 'FAULT_CODE_OVER_TEMP_FET'
6 : 'FAULT_CODE_OVER_TEMP_MOTOR'
7 : 'FAULT_CODE_GATE_DRIVER_OVER_VOLTAGE'
8 : 'FAULT_CODE_GATE_DRIVER_UNDER_VOLTAGE'
9 : 'FAULT_CODE_MCU_UNDER_VOLTAGE'
10 : 'FAULT_CODE_BOOTING_FROM_WATCHDOG_RESET'
11 : 'FAULT_CODE_ENCODER_SPI'
12 : 'FAULT_CODE_ENCODER_SINCOS_BELOW_MIN_AMPLITUDE'
13 : 'FAULT_CODE_ENCODER_SINCOS_ABOVE_MAX_AMPLITUDE'
14 : 'FAULT_CODE_FLASH_CORRUPTION'
15 : 'FAULT_CODE_HIGH_OFFSET_CURRENT_SENSOR_1'
16 : 'FAULT_CODE_HIGH_OFFSET_CURRENT_SENSOR_2'
17 : 'FAULT_CODE_HIGH_OFFSET_CURRENT_SENSOR_3'
18 : 'FAULT_CODE_UNBALANCED_CURRENTS'
"""
# electrical variables
current_qaxis = property(get_current_qaxis_amps, set_motor_current_qaxis_amps, doc="current_qaxis_amps")
"""Q-axis current in amps"""
current_daxis = property(get_current_daxis_amps, doc="current_daxis_amps")
"""D-axis current in amps"""
current_bus = property(get_current_bus_amps, doc="current_bus_amps")
"""Bus input current in amps"""
voltage_qaxis = property(get_voltage_qaxis_volts, doc="voltage_qaxis_volts")
"""Q-axis voltage in volts"""
voltage_daxis = property(get_voltage_daxis_volts, doc="voltage_daxis_volts")
"""D-axis voltage in volts"""
voltage_bus = property(get_voltage_bus_volts, doc="voltage_bus_volts")
"""Bus input voltage in volts"""
# output-side variables
position = property(get_output_angle_radians, set_output_angle_radians, doc="output_angle_radians")
"""Output angle in rad"""
velocity = property (get_output_velocity_radians_per_second, set_output_velocity_radians_per_second, doc="output_velocity_radians_per_second")
"""Output velocity in rad/s"""
acceleration = property(get_output_acceleration_radians_per_second_squared, doc="output_acceleration_radians_per_second_squared")
"""Output acceleration in rad/s/s"""
torque = property(get_output_torque_newton_meters, set_output_torque_newton_meters, doc="output_torque_newton_meters")
"""Output torque in Nm"""
# motor-side variables
angle_motorside = property(get_motor_angle_radians, set_motor_angle_radians, doc="motor_angle_radians")
"""Motor-side angle in rad"""
velocity_motorside = property (get_motor_velocity_radians_per_second, set_motor_velocity_radians_per_second, doc="motor_velocity_radians_per_second")
"""Motor-side velocity in rad/s"""
acceleration_motorside = property(get_motor_acceleration_radians_per_second_squared, doc="motor_acceleration_radians_per_second_squared")
"""Motor-side acceleration in rad/s/s"""
torque_motorside = property(get_motor_torque_newton_meters, set_motor_torque_newton_meters, doc="motor_torque_newton_meters")
"""Motor-side torque in Nm"""