Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark structured streaming schema evolution #319

Open
richiesgr opened this issue Mar 23, 2023 · 2 comments
Open

Spark structured streaming schema evolution #319

richiesgr opened this issue Mar 23, 2023 · 2 comments

Comments

@richiesgr
Copy link

richiesgr commented Mar 23, 2023

Hi @kevinwallimann
First I'm aware of the issue

I used abris to stream avro to delta table. As mentioned in the issue the schema evolution is not working neither using streaming or foreachbatch.

However using this code I'm able to update the schema for each message at least in the source the problem is that is never reflected on the sink (can be easily adapted to use abris by the way)

You said spark can't handle this out of the box I confirm but do you've any idea how to implement it ?
Thanks

@Orpheuz
Copy link

Orpheuz commented Mar 27, 2023

Hey @richiesgr, the code I used in my examples is really similar to ABRiS, I think it might have some enhancements regarding Enum types but overall it should be very similar. I also have no idea how to develop a workaround but it should be around restarting/replanning the streaming job.

@kevinwallimann
Copy link
Collaborator

Hi @richiesgr
Thanks for your question. In #176, I wrote that schema evolution is not possible during query execution, however that does not mean that schema evolution is generally impossible. However, you have to stop your query and restart it, and thus let Spark generate a new execution plan with the new schema. This is what we usually do to support schema evolution.

What I meant in #176 with making Spark change its execution plan even during a long-running Structured Streaming query, is based on the realization that in micro batch mode, Spark actually creates a series of query executions which all get a new instance of the execution plan, see here: https://github.com/apache/spark/blob/v3.3.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L647
You should also be able to see execution plans per micro batch in the Spark UI. So theoretically, maybe that execution plan could be changed across micro batches. As this is in no way supported by Spark, basically you'd have to fork Spark, reimplement this part and maintain it for future versions of Spark.
Even if you succeed doing this, there is no way to control at which message exactly to create a new Microbatch to change the execution plan, so you'd have to somehow reimplement this part, too.

So basically, you'd have to implement your own fork of Spark to support in-flight schema evolution without stopping and restarting the Spark query

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants