Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix FlatMapTuple typehint bug #33307

Merged
merged 13 commits into from
Dec 6, 2024
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2238,7 +2238,7 @@ def FlatMapTuple(fn, *args, **kwargs): # pylint: disable=invalid-name
if defaults or args or kwargs:
wrapper = lambda x, *args, **kwargs: fn(*(tuple(x) + args), **kwargs)
else:
wrapper = lambda x: fn(*x)
wrapper = lambda x: fn(*tuple(x))

# Proxy the type-hint information from the original function to this new
# wrapped function.
Expand Down
18 changes: 18 additions & 0 deletions sdks/python/apache_beam/typehints/typed_pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import typing
import unittest
from typing import Tuple

import apache_beam as beam
from apache_beam import pvalue
Expand Down Expand Up @@ -999,5 +1000,22 @@ def filter_fn(element: int) -> bool:
self.assertEqual(th.output_types, ((int, ), {}))


class TestFlatMapTuple(unittest.TestCase):
def test_flatmaptuple(self):
# Regression test. See
# https://github.com/apache/beam/issues/33014

def identity(x: Tuple[str, int]) -> Tuple[str, int]:
return x

with beam.Pipeline() as p:
# Just checking that this doesn't raise an exception.
(
p
| "Generate input" >> beam.Create([('P1', [2])])
| "Flat" >> beam.FlatMapTuple(lambda k, vs: [(k, v) for v in vs])
| "Identity" >> beam.Map(identity))


if __name__ == '__main__':
unittest.main()
Loading