diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 9c798d3ce6dc..4d1678d72a69 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -2238,7 +2238,7 @@ def FlatMapTuple(fn, *args, **kwargs): # pylint: disable=invalid-name if defaults or args or kwargs: wrapper = lambda x, *args, **kwargs: fn(*(tuple(x) + args), **kwargs) else: - wrapper = lambda x: fn(*x) + wrapper = lambda x: fn(*tuple(x)) # Proxy the type-hint information from the original function to this new # wrapped function. diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 44318fa44a8c..820f78fa9ef5 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -21,6 +21,7 @@ import typing import unittest +from typing import Tuple import apache_beam as beam from apache_beam import pvalue @@ -999,5 +1000,22 @@ def filter_fn(element: int) -> bool: self.assertEqual(th.output_types, ((int, ), {})) +class TestFlatMapTuple(unittest.TestCase): + def test_flatmaptuple(self): + # Regression test. See + # https://github.com/apache/beam/issues/33014 + + def identity(x: Tuple[str, int]) -> Tuple[str, int]: + return x + + with beam.Pipeline() as p: + # Just checking that this doesn't raise an exception. + ( + p + | "Generate input" >> beam.Create([('P1', [2])]) + | "Flat" >> beam.FlatMapTuple(lambda k, vs: [(k, v) for v in vs]) + | "Identity" >> beam.Map(identity)) + + if __name__ == '__main__': unittest.main()