Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,8 @@ def __init__(self, partitioning: HashBasedShufflePartitioning):
super().__init__(set_one_of(Partitioning, partitioning))
logger.debug(f"got {partitioning}")
self.batch_size = partitioning.batch_size
# Partitioning contains an ordered list of downstream worker ids.
# Currently we are using the index of such an order to choose
# a downstream worker to send tuples to.
# Must use dict.fromkeys to ensure the order of receiver workers
# from partitioning is preserved (using `{}` to create a set
# does not preserve order and will not work correctly.)
self.receivers = [
(rid, [])
for rid in dict.fromkeys(
channel.to_worker_id for channel in partitioning.channels
)
]
# Indexed by hash_code to choose the downstream worker to send to.
self.receivers = self.build_receiver_batches(partitioning.channels)
self.hash_attribute_names = partitioning.hash_attribute_names

@overrides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ class Partitioner(ABC):
def __init__(self, partitioning: Message):
self.partitioning: Partitioning = get_one_of(partitioning)

@staticmethod
def build_receiver_batches(
channels,
) -> typing.List[typing.Tuple[ActorVirtualIdentity, typing.List[Tuple]]]:
# An ordered (receiver, batch) pair per distinct downstream worker.
# dict.fromkeys preserves the channel order; a set literal would not,
# which breaks input-port materialization reader threads.
return [
(rid, [])
for rid in dict.fromkeys(channel.to_worker_id for channel in channels)
]

def add_tuple_to_batch(
self, tuple_: Tuple
) -> Iterator[typing.Tuple[ActorVirtualIdentity, typing.List[Tuple]]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,8 @@ def __init__(self, partitioning: RangeBasedShufflePartitioning):
super().__init__(set_one_of(Partitioning, partitioning))
logger.info(f"got {partitioning}")
self.batch_size = partitioning.batch_size
# Partitioning contains an ordered list of downstream worker ids.
# Currently we are using the index of such an order to choose
# a downstream worker to send tuples to.
# Must use dict.fromkeys to ensure the order of receiver workers
# from partitioning is preserved (using `{}` to create a set
# does not preserve order and will not work correctly.)
self.receivers = [
(rid, [])
for rid in dict.fromkeys(
channel.to_worker_id for channel in partitioning.channels
)
]
# Indexed by get_receiver_index to choose the downstream worker to send to.
self.receivers = self.build_receiver_batches(partitioning.channels)
self.range_attribute_names = partitioning.range_attribute_names
self.range_min = partitioning.range_min
self.range_max = partitioning.range_max
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,8 @@ class RoundRobinPartitioner(Partitioner):
def __init__(self, partitioning: RoundRobinPartitioning):
super().__init__(set_one_of(Partitioning, partitioning))
self.batch_size = partitioning.batch_size
# Partitioning contains an ordered list of downstream worker ids.
# Currently we are using the index of such an order to choose
# a downstream worker to send tuples to.
# Must use dict.fromkeys to ensure the order of receiver workers
# from partitioning is preserved (using `{}` to create a set
# does not preserve order and will not work with input-port
# materialization reader threads.)
self.receivers = [
(rid, [])
for rid in dict.fromkeys(
channel.to_worker_id for channel in partitioning.channels
)
]
# Indexed by round_robin_index to choose the downstream worker to send to.
self.receivers = self.build_receiver_batches(partitioning.channels)
self.round_robin_index = 0

@overrides
Expand Down
Loading