Skip to content

Commit 54e6287

Browse files
Brian Arnoldeywalker
authored andcommitted
Prevent run_async from removing tag columns when processing existing cached entries
1 parent 11623f5 commit 54e6287

1 file changed

Lines changed: 11 additions & 4 deletions

File tree

src/orcapod/core/streams/pod_node_stream.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,18 @@ async def run_async(
9595
.drop_columns([constants.INPUT_PACKET_HASH])
9696
)
9797

98-
existing = (
99-
all_results.filter(pc.is_valid(pc.field("_exists")))
100-
.drop_columns(target_entries.column_names)
101-
.drop_columns(["_exists"])
98+
existing = all_results.filter(
99+
pc.is_valid(pc.field("_exists"))
100+
).drop_columns(
101+
[
102+
"_exists",
103+
constants.INPUT_PACKET_HASH,
104+
constants.PACKET_RECORD_ID,
105+
*self.input_stream.keys()[1], # remove the input packet keys
106+
]
107+
# TODO: look into NOT fetching back the record ID
102108
)
109+
103110
renamed = [
104111
c.removesuffix("_right") if c.endswith("_right") else c
105112
for c in existing.column_names

0 commit comments

Comments
 (0)