Skip to content

Commit 6a4ba8e

Browse files
HovanNgoOpentronsryanthecoderrclarke0
authored
feat(hardware-testing, api): CNC firmware and scripts to measure labware (#20253)
# Overview PR for touch probe proof of concept. Adds firmware, drivers, and hardware testing scripts for the touch probe. https://docs.google.com/document/d/151cAolVlUmFbdYdXOCe5GIgDmzycKI_iPKSS7zWbS0I/edit?tab=t.0 ## Changelog -added touch probe firmware + drivers -added touch_probe mock scripts to hardware testing --------- Co-authored-by: Ryan Howard <[email protected]> Co-authored-by: Rhyann Clarke <[email protected]> Co-authored-by: Rhyann Clarke <[email protected]>
1 parent 4c2e081 commit 6a4ba8e

File tree

14 files changed

+800
-25
lines changed

14 files changed

+800
-25
lines changed

api/src/opentrons/hardware_control/backends/ot3controller.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@
172172
liquid_probe,
173173
check_overpressure,
174174
grab_pressure,
175+
touch_probe,
175176
)
176177
from opentrons_hardware.hardware_control.rear_panel_settings import (
177178
get_door_state,
@@ -1524,6 +1525,22 @@ def _pop_queue() -> Optional[Tuple[NodeId, ErrorCode]]:
15241525
else:
15251526
yield
15261527

1528+
async def touch_probe(self, axis: Axis, speed: float, distance: float) -> float:
1529+
1530+
status = await touch_probe(
1531+
messenger=self._messenger,
1532+
mover=axis_to_node(axis),
1533+
distance=distance,
1534+
speed=speed,
1535+
)
1536+
1537+
self._position[axis_to_node(axis)] = status.motor_position
1538+
1539+
if status.move_ack != MoveCompleteAck.stopped_by_condition:
1540+
raise Exception("Booo")
1541+
1542+
return self._position[axis_to_node(axis)]
1543+
15271544
async def liquid_probe(
15281545
self,
15291546
mount: OT3Mount,

api/src/opentrons/hardware_control/ot3api.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3302,3 +3302,26 @@ async def read_stem_capacitance(
33023302
realmount = OT3Mount.from_mount(mount)
33033303
s_data = await self._backend.read_capacitive_sensor(realmount, primary)
33043304
return s_data if s_data else 0.0
3305+
3306+
async def touch_probe(
3307+
self,
3308+
mount: Union[top_types.Mount, OT3Mount],
3309+
axis: Axis,
3310+
speed: float,
3311+
distance: float,
3312+
) -> top_types.Point:
3313+
await self._backend.touch_probe(axis, speed, distance) # type: ignore[attr-defined]
3314+
realmount = OT3Mount.from_mount(mount)
3315+
machine_pos = await self._backend.update_position()
3316+
end_pos = self.get_deck_from_machine(machine_pos)
3317+
end_point = top_types.Point(
3318+
end_pos[Axis.X], end_pos[Axis.Y], end_pos[Axis.by_mount(realmount)]
3319+
)
3320+
offset = offset_for_mount(
3321+
realmount,
3322+
top_types.Point(*self._config.left_mount_offset),
3323+
top_types.Point(*self._config.right_mount_offset),
3324+
top_types.Point(*self._config.gripper_mount_offset),
3325+
)
3326+
cp = self.critical_point_for(realmount, None)
3327+
return end_point + offset + cp

hardware-testing/hardware_testing/drivers/asair_sensor.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -164,22 +164,24 @@ def get_reading(self, retries: int = 5) -> Reading:
164164
log.debug(f"sending {data_packet}")
165165
command_bytes = codecs.decode(data_packet.encode(), "hex")
166166
try:
167-
self._th_sensor.flushInput()
168-
self._th_sensor.flushOutput()
167+
self._th_sensor.flushInput() # type: ignore[attr-defined]
168+
self._th_sensor.flushOutput() # type: ignore[attr-defined]
169+
169170
self._th_sensor.write(command_bytes)
170171
time.sleep(0.1)
171172

172-
length = self._th_sensor.inWaiting()
173+
length = self._th_sensor.inWaiting() # type: ignore[attr-defined]
173174
res = self._th_sensor.read(length)
174-
log.debug(f"received {res}")
175+
log.debug(f"received {res!r}")
175176

176-
res = codecs.encode(res, "hex")
177-
relative_hum = res[6:10]
178-
temp = res[10:14]
179-
log.info(f"Temp: {temp}, RelativeHum: {relative_hum}")
177+
res_hex = codecs.encode(res, "hex").decode("ascii")
178+
relative_hum_str = res_hex[6:10]
179+
temp_str = res_hex[10:14]
180+
log.info(f"Temp: {temp_str!r}, RelativeHum: {relative_hum_str!r}")
181+
182+
temp = float(int(temp_str, 16)) / 10
183+
relative_hum = float(int(relative_hum_str, 16)) / 10
180184

181-
temp = float(int(temp, 16)) / 10
182-
relative_hum = float(int(relative_hum, 16)) / 10
183185
return Reading(temperature=temp, relative_humidity=relative_hum)
184186

185187
except (IndexError, ValueError) as e:
@@ -202,15 +204,15 @@ def get_serial(self) -> str:
202204
log.debug(f"sending {data_packet}")
203205
command_bytes = codecs.decode(data_packet.encode(), "hex")
204206
try:
205-
self._th_sensor.flushInput()
206-
self._th_sensor.flushOutput()
207+
self._th_sensor.flushInput() # type: ignore[attr-defined]
208+
self._th_sensor.flushOutput() # type: ignore[attr-defined]
207209
self._th_sensor.write(command_bytes)
208210
time.sleep(0.1)
209211

210-
length = self._th_sensor.inWaiting()
212+
length = self._th_sensor.inWaiting() # type: ignore[attr-defined]
211213
res = self._th_sensor.read(length)
212214
res = codecs.encode(res, "hex")
213-
log.debug(f"received {res}")
215+
log.debug(f"received {res!r}")
214216
dev_id = res[6:14]
215217
return dev_id.decode()
216218

hardware-testing/hardware_testing/drivers/mark10/mark10_fg.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ def read_force(self, timeout: float = 1.0) -> float:
103103
try:
104104
force_val, units = line.split(" ")
105105
if units != "N":
106-
self._force_guage.write("N\r\n") # Set force gauge units to Newtons
106+
self._force_guage.write(
107+
"N\r\n".encode("utf-8")
108+
) # Set force gauge units to Newtons
107109
print(f'Setting gauge units from {units} to "N" (newtons)')
108110
continue
109111
else:

hardware-testing/hardware_testing/drivers/pressure_fixture.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ def create(cls, port: str, slot_side: str = "left") -> "PressureFixture":
158158
def connect(self) -> None:
159159
"""Connect."""
160160
self._port.open()
161-
self._port.flushInput()
161+
self._port.flushInput() # type: ignore[attr-defined]
162162
# NOTE: device might take a few seconds to boot up
163163
sleep(FIXTURE_REBOOT_TIME)
164164
fw_version = self.firmware_version()

hardware-testing/hardware_testing/drivers/radwag/driver.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,14 @@ def _write_command(self, cmd: str) -> None:
121121
def _read_response(
122122
self, command: RadwagCommand, timeout: Optional[float] = None
123123
) -> RadwagResponse:
124+
prev_timeout: float = float(self._connection.timeout or 0.0)
124125
if timeout is not None:
125-
prev_timeout = float(self._connection.timeout)
126126
self._connection.timeout = timeout
127127
response = self._connection.readline()
128128
self._connection.timeout = prev_timeout
129129
else:
130130
response = self._connection.readline()
131-
self._raw_log.write(f"{datetime.datetime.now()} <-- {response}\n")
131+
self._raw_log.write(f"{datetime.datetime.now()} <-- {response!r}\n")
132132
data = radwag_response_parse(response.decode("utf-8"), command)
133133
return data
134134

hardware-testing/hardware_testing/drivers/sealed_pressure_fixture.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33

44
import time
55
from typing import Union, Optional, Any, List
6-
import serial # type: ignore[import-untyped]
7-
from serial.tools.list_ports import comports # type: ignore[import-untyped]
6+
import serial # type: ignore[import]
7+
from serial.tools.list_ports import comports # type: ignore[import]
88

99
ReceiveBuffer = 100
1010

@@ -42,7 +42,7 @@ def init_serial(self, baud: int) -> None:
4242
bytesize=serial.EIGHTBITS,
4343
timeout=1,
4444
)
45-
if self.com.isOpen():
45+
if self.com.isOpen(): # type: ignore[attr-defined]
4646
print(f"{self.device} Opened! \n")
4747
# settings
4848
self.com.bytesize = serial.EIGHTBITS # 数据位 8
@@ -75,9 +75,9 @@ def write_and_get_buffer(
7575
return None
7676
if not isinstance(send, bytes):
7777
send = (str(send) + "\r\n").encode("utf-8")
78-
self.com.flushInput() # type: ignore [union-attr]
79-
self.com.flushOutput() # type: ignore [union-attr]
80-
self.com.write(send) # type: ignore [union-attr]
78+
self.com.flushInput() # type: ignore[attr-defined]
79+
self.com.flushOutput() # type: ignore[attr-defined]
80+
self.com.write(send) # type: ignore[attr-defined]
8181
time.sleep(0.1)
8282
if delay is None:
8383
pass
@@ -86,7 +86,7 @@ def write_and_get_buffer(
8686
if only_write is True:
8787
return None
8888
for i in range(times):
89-
data = self.com.read(ReceiveBuffer) # type: ignore [union-attr]
89+
data = self.com.read(ReceiveBuffer) # type: ignore[attr-defined]
9090
if type(data) is not bytes:
9191
if "OK" not in data.decode("utf-8") or "busy" in data.decode("utf-8"):
9292
time.sleep(1)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""CNC Driver."""
2+
from .dimensions import ProbeTarget
3+
from .touch_probe import TouchProbe, ProbeConfig
4+
5+
__all__ = ["ProbeConfig", "ProbeTarget", "TouchProbe"]
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
"""Proof of concept for use of touch probe."""
2+
import argparse
3+
import asyncio
4+
from hardware_testing.opentrons_api import helpers_ot3
5+
from hardware_testing.opentrons_api.types import OT3Mount, Axis, Point
6+
from hardware_testing.drivers.touch_probe import TouchProbe, ProbeConfig
7+
8+
# ============================================================================
9+
# Main Function
10+
# ============================================================================
11+
12+
13+
async def _main(simulating: bool, mount: OT3Mount, num_wells: int, slot: int) -> None:
14+
config = ProbeConfig()
15+
api = await helpers_ot3.build_async_ot3_hardware_api(
16+
is_simulating=simulating, use_defaults=True
17+
)
18+
await api.home()
19+
await api.cache_instruments()
20+
tp = TouchProbe(api, mount, config)
21+
# probe deck surface
22+
print(f"\nProbing slot center #{6}")
23+
deck_pos = await tp.get_deck_z(6)
24+
print(f"z deck pos: {deck_pos.z}")
25+
deck_square_center_radius = await tp.search_hole_and_center(deck_pos)
26+
if deck_square_center_radius:
27+
deck_square_center, deck_square_radius = deck_square_center_radius
28+
print(
29+
f"Deck square center: {deck_square_center} length: {deck_square_radius * 2}"
30+
)
31+
32+
# probe gauge block
33+
print(f"\nProbing Gauge Block on slot #{7}")
34+
block_dims = await tp.calibrate_labware(7, deck_pos)
35+
if not block_dims:
36+
print("Calibration failed.")
37+
return
38+
39+
print("\nCalibration Block Dimensions:")
40+
print(f" Width: {block_dims.width:.3f} mm")
41+
print(f" Length: {block_dims.length:.3f} mm")
42+
print(f" Height: {block_dims.height:.3f} mm")
43+
44+
# probe module
45+
print(f"\nProbing Module on slot #{4}")
46+
module_pos = await tp.get_deck_z(4)
47+
print(f"z module pos: {module_pos.z}")
48+
await tp.search_hole_and_center(module_pos)
49+
50+
# Probe labware
51+
print(f"\nStarting Calibration on slot #{slot}")
52+
dimensions = await tp.calibrate_labware(slot, deck_pos)
53+
if not dimensions:
54+
print("Calibration failed.")
55+
return
56+
57+
dimensions.set_num_wells(num_wells)
58+
59+
# calculate A1 pos
60+
first_well_xy = Point(
61+
x=dimensions.x_min.x + dimensions.x_offset,
62+
y=dimensions.y_max.y - dimensions.y_offset,
63+
z=dimensions.z_max.z,
64+
)
65+
66+
# Detecting A1 + center
67+
well_center_and_radius = await tp.search_hole_and_center(first_well_xy)
68+
if well_center_and_radius:
69+
well_center, dimensions.radius = well_center_and_radius
70+
brim_z = await tp.get_brim_height(well_center, dimensions.radius)
71+
if brim_z is not None:
72+
dimensions.z_max = dimensions.z_max._replace(z=brim_z)
73+
well_center = well_center._replace(z=brim_z)
74+
dimensions.well_bottom = await tp.get_bottom(well_center)
75+
else:
76+
print("Unable to determine well dimensions.")
77+
78+
# Print final dimensions using dataclass properties
79+
print("\nLabware Dimensions:")
80+
print(f" Deck Radius: {deck_square_radius:.3f} mm")
81+
print(f" Deck Center: {deck_square_center} mm")
82+
print(f" Width: {dimensions.width:.3f} mm")
83+
print(f" Length: {dimensions.length:.3f} mm")
84+
print(f" Height: {dimensions.height:.3f} mm")
85+
print(f" Depth: {dimensions.depth:.3f} mm")
86+
print(f" Bottom Offset: {dimensions.bottom_offset:.3f} mm")
87+
print(f" Well Radius: {dimensions.radius:.3f} mm")
88+
print(f" Well Center: {well_center} mm")
89+
print(f" radius: {dimensions.radius} mm")
90+
print(f" spacing: {dimensions.spacing}")
91+
await asyncio.sleep(3.0)
92+
await api.home([Axis.Z, Axis.X, Axis.Y])
93+
return
94+
95+
96+
if __name__ == "__main__":
97+
print("\nTouch Probe Test\n")
98+
arg_parser = argparse.ArgumentParser(description="Touch Probe Test")
99+
arg_parser.add_argument("--simulate", action="store_true")
100+
arg_parser.add_argument("--num_wells", type=int, default=96)
101+
arg_parser.add_argument("--slot", type=int, default=5)
102+
args = arg_parser.parse_args()
103+
mount = OT3Mount.LEFT
104+
asyncio.run(_main(args.simulate, mount, args.num_wells, args.slot))
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
"""Data and configuration ."""
2+
from hardware_testing.opentrons_api.types import Point
3+
from typing import Optional
4+
from dataclasses import dataclass
5+
6+
# ============================================================================
7+
# Data
8+
# ============================================================================
9+
10+
11+
@dataclass
12+
class ProbeTarget:
13+
"""Global configuration of dimensions found."""
14+
15+
deck_pos: Point
16+
x_min: Point
17+
x_max: Point
18+
y_min: Point
19+
y_max: Point
20+
z_max: Point # top of object
21+
radius: Optional[float]
22+
23+
@property
24+
def width(self) -> float:
25+
"""Width of the probed area (X dimension)."""
26+
return abs(self.x_max.x - self.x_min.x)
27+
28+
@property
29+
def length(self) -> float:
30+
"""Length of the probed area (Y dimension)."""
31+
return abs(self.y_max.y - self.y_min.y)
32+
33+
@property
34+
def height(self) -> float:
35+
"""Height of the probed area (Z dimension from deck to top)."""
36+
return self.z_max.z - self.deck_pos.z
37+
38+
39+
@dataclass
40+
class LabwareDims(ProbeTarget):
41+
"""Container for probe results with computed properties."""
42+
43+
well_bottom: Optional[Point] = None
44+
num_wells: Optional[int] = 96
45+
x_offset: float = 0.0
46+
y_offset: float = 0.0
47+
spacing: float = 0.0
48+
49+
def set_num_wells(self, num_wells: int) -> None:
50+
"""Determines dimensions based on well number."""
51+
self.num_wells = num_wells
52+
match num_wells:
53+
case 96:
54+
self.x_offset = 14.38
55+
self.y_offset = 11.23
56+
self.spacing = 9.0
57+
case 384:
58+
self.x_offset = 11.3
59+
self.y_offset = 8.5
60+
self.spacing = 3.4
61+
case _:
62+
raise ValueError(f"Unsupported num_wells: {num_wells}")
63+
64+
@property
65+
def depth(self) -> Optional[float]:
66+
"""Return the vertical distance from the top to the bottom of the hole, or None if unknown."""
67+
if self.well_bottom is None:
68+
return None
69+
return self.z_max.z - self.well_bottom.z
70+
71+
@property
72+
def bottom_offset(self) -> Optional[float]:
73+
"""Height of the bottom offset (Z dimension from hole bottom to deck)."""
74+
if self.well_bottom is None:
75+
return None
76+
return self.well_bottom.z - self.deck_pos.z

0 commit comments

Comments
 (0)