0

I am getting this error when I try 2 relational_db.Write from the same Pcollection.

Its very strange that sometimes it works, sometimes doesnt.

From persons_raw to persons_stage I added 3 new columns to the dict: tra_personalEmail, tra_companyDomain, tra_linkedInProfile

I tried to change the order, but didn't work.

Does anyone can help me?

Thanks in advance

Code:

with beam.Pipeline(options=pipeline_options) as p:

persons_raw = (
p
|'Create pipeline' >> beam.Create(['data.csv'])
|'Read multiline CSV' >> beam.FlatMap(lambda filename: csv.reader(io.TextIOWrapper(beam.io.filesystems.FileSystems.open(input_path),encoding='utf-8')))
|'Remove header' >> beam.Filter(lambda x: x[0] != 'FIRST NAME')
|'Raw schema' >> beam.Map(schema_persons_raw)
)

persons_raw_to_postgres = (
persons_raw
|'Persons_raw to DB' >> relational_db.Write(source_config=source_config,table_config=table_config_persons_raw)
)

persons_stage = (
persons_raw
|'Personal E-mail' >> beam.Map(tra_personal_email)
|'Company domain' >> beam.Map(tra_company_domain)
|'Linkedin url' >> beam.Map(tra_linkedin_url)
)

persons_stage_to_postgres = (
persons_stage
|'Persons_stage to DB' >> relational_db.Write(source_config=source_config,table_config=table_config_persons_stage)
)

Error:

sqlalchemy.exc.CompileError: Unconsumed column names: tra_personalEmail, tra_companyDomain, tra_linkedInProfile [while running 'Persons_raw to DB/ParDo(_WriteToRelationalDBFn)']

1 Answer 1

0

A similar question CompileError when trying to run Insert Statement on SQLAlchemy.

The source of the confusion is that when you create tables in Postgres, Postgres will automatically lower case your column names unless you use quotes when naming them.

Sign up to request clarification or add additional context in comments.

1 Comment

Hi. Thank you for the answer. but I think that's not the case because when I comment one of the write db transforms, the other one works. Also, sometimes both works in the same run

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.