## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#importjsonfromtypingimportAny,Dict,List,Optional,TYPE_CHECKINGfrompyspark.errorsimportStreamingQueryException,PySparkValueErrorfrompyspark.errors.exceptions.capturedimport(StreamingQueryExceptionasCapturedStreamingQueryException,)frompyspark.sql.streaming.listenerimportStreamingQueryListenerifTYPE_CHECKING:frompy4j.java_gatewayimportJavaObject__all__=["StreamingQuery","StreamingQueryManager"]
[docs]classStreamingQuery:""" A handle to a query that is executing continuously in the background as new data arrives. All these methods are thread-safe. .. versionadded:: 2.0.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Notes ----- This API is evolving. """def__init__(self,jsq:"JavaObject")->None:self._jsq=jsq@propertydefid(self)->str:""" Returns the unique id of this query that persists across restarts from checkpoint data. That is, this id is generated when a query is started for the first time, and will be the same every time it is restarted from checkpoint data. There can only be one query with the same id active in a Spark cluster. Also see, `runId`. .. versionadded:: 2.0.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Returns ------- str The unique id of query that persists across restarts from checkpoint data. Examples -------- >>> sdf = spark.readStream.format("rate").load() >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() Get the unique id of this query that persists across restarts from checkpoint data >>> sq.id '...' >>> sq.stop() """returnself._jsq.id().toString()@propertydefrunId(self)->str:""" Returns the unique id of this query that does not persist across restarts. That is, every query that is started (or restarted from checkpoint) will have a different runId. .. versionadded:: 2.1.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Returns ------- str The unique id of query that does not persist across restarts. Examples -------- >>> sdf = spark.readStream.format("rate").load() >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() Get the unique id of this query that does not persist across restarts >>> sq.runId '...' >>> sq.stop() """returnself._jsq.runId().toString()@propertydefname(self)->str:""" Returns the user-specified name of the query, or null if not specified. This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter` as `dataframe.writeStream.queryName("query").start()`. This name, if set, must be unique across all active queries. .. versionadded:: 2.0.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Returns ------- str The user-specified name of the query, or null if not specified. Examples -------- >>> sdf = spark.readStream.format("rate").load() >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() Get the user-specified name of the query, or null if not specified. >>> sq.name 'this_query' >>> sq.stop() """returnself._jsq.name()@propertydefisActive(self)->bool:""" Whether this streaming query is currently active or not. .. versionadded:: 2.0.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Returns ------- bool The result whether specified streaming query is currently active or not. Examples -------- >>> sdf = spark.readStream.format("rate").load() >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() >>> sq.isActive True >>> sq.stop() """returnself._jsq.isActive()
[docs]defawaitTermination(self,timeout:Optional[int]=None)->Optional[bool]:""" Waits for the termination of `this` query, either by :func:`query.stop()` or by an exception. If the query has terminated with an exception, then the exception will be thrown. If `timeout` is set, it returns whether the query has terminated or not within the `timeout` seconds. If the query has terminated, then all subsequent calls to this method will either return immediately (if the query was terminated by :func:`stop()`), or throw the exception immediately (if the query has terminated with exception). throws :class:`StreamingQueryException`, if `this` query has terminated with an exception .. versionadded:: 2.0.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Parameters ---------- timeout : int, optional default ``None``. The waiting time for specified streaming query to terminate. Returns ------- bool, optional The result whether specified streaming query has terminated or not within the `timeout` seconds if `timeout` is set. The :class:`StreamingQueryException` will be thrown if the query has terminated with an exception. Examples -------- >>> sdf = spark.readStream.format("rate").load() >>> sq = sdf.writeStream.format('memory').queryName('query_awaitTermination').start() Return whether the query has terminated or not within 5 seconds >>> sq.awaitTermination(5) False >>> sq.stop() """iftimeoutisnotNone:ifnotisinstance(timeout,(int,float))ortimeout<=0:raisePySparkValueError(error_class="VALUE_NOT_POSITIVE",message_parameters={"arg_name":"timeout","arg_value":type(timeout).__name__},)returnself._jsq.awaitTermination(int(timeout*1000))else:returnself._jsq.awaitTermination()
@propertydefstatus(self)->Dict[str,Any]:""" Returns the current status of the query. .. versionadded:: 2.1.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Returns ------- dict The current status of the specified query. Examples -------- >>> sdf = spark.readStream.format("rate").load() >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() Get the current status of the query >>> sq.status {'message': '...', 'isDataAvailable': ..., 'isTriggerActive': ...} >>> sq.stop() """returnjson.loads(self._jsq.status().json())@propertydefrecentProgress(self)->List[Dict[str,Any]]:""" Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. The number of progress updates retained for each stream is configured by Spark session configuration `spark.sql.streaming.numRecentProgressUpdates`. .. versionadded:: 2.1.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Returns ------- list List of dict which is the most recent :class:`StreamingQueryProgress` updates for this query. Examples -------- >>> sdf = spark.readStream.format("rate").load() >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() Get an array of the most recent query progress updates for this query >>> sq.recentProgress [...] >>> sq.stop() """return[json.loads(p.json())forpinself._jsq.recentProgress()]@propertydeflastProgress(self)->Optional[Dict[str,Any]]:""" Returns the most recent :class:`StreamingQueryProgress` update of this streaming query or None if there were no progress updates .. versionadded:: 2.1.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Returns ------- dict, optional The most recent :class:`StreamingQueryProgress` update of this streaming query or None if there were no progress updates. Examples -------- >>> sdf = spark.readStream.format("rate").load() >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() Get the most recent query progress updates for this query >>> sq.lastProgress >>> sq.stop() """lastProgress=self._jsq.lastProgress()iflastProgress:returnjson.loads(lastProgress.json())else:returnNone
[docs]defprocessAllAvailable(self)->None:""" Blocks until all available data in the source has been processed and committed to the sink. This method is intended for testing. .. versionadded:: 2.0.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Notes ----- In the case of continually arriving data, this method may block forever. Additionally, this method is only guaranteed to block until data that has been synchronously appended data to a stream source prior to invocation. (i.e. `getOffset` must immediately reflect the addition). Examples -------- >>> sdf = spark.readStream.format("rate").load() >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() Blocks query until all available data in the source has been processed and committed to the sink >>> sq.processAllAvailable <bound method StreamingQuery.processAllAvailable ...> >>> sq.stop() """returnself._jsq.processAllAvailable()
[docs]defexplain(self,extended:bool=False)->None:""" Prints the (logical and physical) plans to the console for debugging purpose. .. versionadded:: 2.1.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Parameters ---------- extended : bool, optional default ``False``. If ``False``, prints only the physical plan. Examples -------- >>> sdf = spark.readStream.format("rate").load() >>> sdf.printSchema() root |-- timestamp: timestamp (nullable = true) |-- value: long (nullable = true) >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start() >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans. Explain the runtime plans >>> sq.explain() == Physical Plan == ... >>> sq.explain(True) == Parsed Logical Plan == ... == Analyzed Logical Plan == ... == Optimized Logical Plan == ... == Physical Plan == ... >>> sq.stop() """# Cannot call `_jsq.explain(...)` because it will print in the JVM process.# We should print it in the Python process.print(self._jsq.explainInternal(extended))
[docs]defexception(self)->Optional[StreamingQueryException]:""" .. versionadded:: 2.1.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Returns ------- :class:`StreamingQueryException` the StreamingQueryException if the query was terminated by an exception, or None. """ifself._jsq.exception().isDefined():je=self._jsq.exception().get()msg=je.toString().split(": ",1)[1]# Drop the Java StreamingQueryException type infostackTrace="\n\t at ".join(map(lambdax:x.toString(),je.getStackTrace()))returnCapturedStreamingQueryException(msg,stackTrace,je.getCause())else:returnNone
[docs]classStreamingQueryManager:"""A class to manage all the :class:`StreamingQuery` StreamingQueries active. .. versionadded:: 2.0.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Notes ----- This API is evolving. """def__init__(self,jsqm:"JavaObject")->None:self._jsqm=jsqm@propertydefactive(self)->List[StreamingQuery]:""" Returns a list of active queries associated with this SparkSession .. versionadded:: 2.0.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Returns ------- list The active queries associated with this :class:`SparkSession`. Examples -------- >>> sdf = spark.readStream.format("rate").load() >>> sdf.printSchema() root |-- timestamp: timestamp (nullable = true) |-- value: long (nullable = true) >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() >>> sqm = spark.streams Get the list of active streaming queries >>> [q.name for q in sqm.active] ['this_query'] >>> sq.stop() """return[StreamingQuery(jsq)forjsqinself._jsqm.active()]
[docs]defget(self,id:str)->Optional[StreamingQuery]:""" Returns an active query from this :class:`SparkSession`. .. versionadded:: 2.0.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Parameters ---------- id : str The unique id of specified query. Returns ------- :class:`StreamingQuery` An active query with `id` from this SparkSession. Notes ----- Exception will be thrown if an active query with this id does not exist. Examples -------- >>> sdf = spark.readStream.format("rate").load() >>> sdf.printSchema() root |-- timestamp: timestamp (nullable = true) |-- value: long (nullable = true) >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() >>> sq.name 'this_query' Get an active query by id >>> sq = spark.streams.get(sq.id) >>> sq.isActive True >>> sq.stop() """query=self._jsqm.get(id)ifqueryisnotNone:returnStreamingQuery(query)else:returnNone
[docs]defawaitAnyTermination(self,timeout:Optional[int]=None)->Optional[bool]:""" Wait until any of the queries on the associated SparkSession has terminated since the creation of the context, or since :func:`resetTerminated()` was called. If any query was terminated with an exception, then the exception will be thrown. If `timeout` is set, it returns whether the query has terminated or not within the `timeout` seconds. If a query has terminated, then subsequent calls to :func:`awaitAnyTermination()` will either return immediately (if the query was terminated by :func:`query.stop()`), or throw the exception immediately (if the query was terminated with exception). Use :func:`resetTerminated()` to clear past terminations and wait for new terminations. In the case where multiple queries have terminated since :func:`resetTermination()` was called, if any query has terminated with exception, then :func:`awaitAnyTermination()` will throw any of the exception. For correctly documenting exceptions across multiple queries, users need to stop all of them after any of them terminates with exception, and then check the `query.exception()` for each query. throws :class:`StreamingQueryException`, if `this` query has terminated with an exception .. versionadded:: 2.0.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Parameters ---------- timeout : int, optional default ``None``. The waiting time for any streaming query to terminate. Returns ------- bool, optional The result whether any streaming query has terminated or not within the `timeout` seconds if `timeout` is set. The :class:`StreamingQueryException` will be thrown if any query has terminated with an exception. Examples -------- >>> sdf = spark.readStream.format("rate").load() >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() Return whether any of the query on the associated SparkSession has terminated or not within 5 seconds >>> spark.streams.awaitAnyTermination(5) True >>> sq.stop() """iftimeoutisnotNone:ifnotisinstance(timeout,(int,float))ortimeout<0:raisePySparkValueError(error_class="VALUE_NOT_POSITIVE",message_parameters={"arg_name":"timeout","arg_value":type(timeout).__name__},)returnself._jsqm.awaitAnyTermination(int(timeout*1000))else:returnself._jsqm.awaitAnyTermination()
[docs]defresetTerminated(self)->None:""" Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used again to wait for new terminations. .. versionadded:: 2.0.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Examples -------- >>> spark.streams.resetTerminated() """self._jsqm.resetTerminated()
[docs]defaddListener(self,listener:StreamingQueryListener)->None:""" Register a :class:`StreamingQueryListener` to receive up-calls for life cycle events of :class:`~pyspark.sql.streaming.StreamingQuery`. .. versionadded:: 3.4.0 .. versionchanged:: 3.5.0 Supports Spark Connect. Parameters ---------- listener : :class:`StreamingQueryListener` A :class:`StreamingQueryListener` to receive up-calls for life cycle events of :class:`~pyspark.sql.streaming.StreamingQuery`. Notes ----- This function behaves differently in Spark Connect mode. In Connect, the provided functions doesn't have access to variables defined outside of it. Also in Connect, you need to use `self.spark` to access spark session. Using `spark` would throw an exception. In short, if you want to use spark session inside the listener, please use `self.spark` in Connect mode, and use `spark` otherwise. Examples -------- >>> from pyspark.sql.streaming import StreamingQueryListener >>> class TestListener(StreamingQueryListener): ... def onQueryStarted(self, event): ... pass ... ... def onQueryProgress(self, event): ... pass ... ... def onQueryIdle(self, event): ... pass ... ... def onQueryTerminated(self, event): ... pass ... >>> test_listener = TestListener() Register streaming query listener >>> spark.streams.addListener(test_listener) Deregister streaming query listener >>> spark.streams.removeListener(test_listener) """frompy4j.java_gatewayimportjava_importfrompysparkimportSparkContextfrompyspark.java_gatewayimportensure_callback_server_startedgw=SparkContext._gatewayassertgwisnotNonejava_import(gw.jvm,"org.apache.spark.sql.streaming.*")ensure_callback_server_started(gw)self._jsqm.addListener(listener._jlistener)
[docs]defremoveListener(self,listener:StreamingQueryListener)->None:""" Deregister a :class:`StreamingQueryListener`. .. versionadded:: 3.4.0 Parameters ---------- listener : :class:`StreamingQueryListener` A :class:`StreamingQueryListener` to receive up-calls for life cycle events of :class:`~pyspark.sql.streaming.StreamingQuery`. Examples -------- >>> from pyspark.sql.streaming import StreamingQueryListener >>> class TestListener(StreamingQueryListener): ... def onQueryStarted(self, event): ... pass ... ... def onQueryProgress(self, event): ... pass ... ... def onQueryIdle(self, event): ... pass ... ... def onQueryTerminated(self, event): ... pass ... >>> test_listener = TestListener() Register streaming query listener >>> spark.streams.addListener(test_listener) Deregister streaming query listener >>> spark.streams.removeListener(test_listener) """self._jsqm.removeListener(listener._jlistener)