public class StreamingContext extends Object implements Logging
DStream
s from various input sources. It can be either
created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf
configuration (see core Spark documentation), or from an existing org.apache.spark.SparkContext.
The associated SparkContext can be accessed using context.sparkContext
. After
creating and transforming DStreams, the streaming computation can be started and stopped
using context.start()
and context.stop()
, respectively.
context.awaitTransformation()
allows the current thread to wait for the termination
of the context by stop()
or by an exception.Constructor and Description |
---|
StreamingContext(SparkConf conf,
Duration batchDuration)
Create a StreamingContext by providing the configuration necessary for a new SparkContext.
|
StreamingContext(SparkContext sparkContext,
Duration batchDuration)
Create a StreamingContext using an existing SparkContext.
|
StreamingContext(String path,
org.apache.hadoop.conf.Configuration hadoopConf)
Recreate a StreamingContext from a checkpoint file.
|
StreamingContext(String master,
String appName,
Duration batchDuration,
String sparkHome,
scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
Create a StreamingContext by providing the details necessary for creating a new SparkContext.
|
Modifier and Type | Method and Description |
---|---|
<T> ReceiverInputDStream<T> |
actorStream(akka.actor.Props props,
String name,
StorageLevel storageLevel,
akka.actor.SupervisorStrategy supervisorStrategy,
scala.reflect.ClassTag<T> evidence$3)
Create an input stream with any arbitrary user implemented actor receiver.
|
void |
addStreamingListener(StreamingListener streamingListener)
Add a
StreamingListener object for
receiving system events related to streaming. |
void |
awaitTermination()
Wait for the execution to stop.
|
void |
awaitTermination(long timeout)
Wait for the execution to stop.
|
void |
checkpoint(String directory)
Set the context to periodically checkpoint the DStream operations for driver
fault-tolerance.
|
String |
checkpointDir() |
Duration |
checkpointDuration() |
SparkConf |
conf() |
static int |
DEFAULT_CLEANER_TTL() |
SparkEnv |
env() |
<K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> |
fileStream(String directory,
scala.reflect.ClassTag<K> evidence$6,
scala.reflect.ClassTag<V> evidence$7,
scala.reflect.ClassTag<F> evidence$8)
Create a input stream that monitors a Hadoop-compatible filesystem
for new files and reads them using the given key-value types and input format.
|
<K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> |
fileStream(String directory,
scala.Function1<org.apache.hadoop.fs.Path,Object> filter,
boolean newFilesOnly,
scala.reflect.ClassTag<K> evidence$9,
scala.reflect.ClassTag<V> evidence$10,
scala.reflect.ClassTag<F> evidence$11)
Create a input stream that monitors a Hadoop-compatible filesystem
for new files and reads them using the given key-value types and input format.
|
static StreamingContext |
getOrCreate(String checkpointPath,
scala.Function0<StreamingContext> creatingFunc,
org.apache.hadoop.conf.Configuration hadoopConf,
boolean createOnError)
Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
|
org.apache.spark.streaming.DStreamGraph |
graph() |
boolean |
isCheckpointPresent() |
static scala.Option<String> |
jarOfClass(Class<?> cls)
Find the JAR from which a given class was loaded, to make it easy for users to pass
their JARs to StreamingContext.
|
<T> ReceiverInputDStream<T> |
networkStream(Receiver<T> receiver,
scala.reflect.ClassTag<T> evidence$1)
Create an input stream with any arbitrary user implemented receiver.
|
<T> InputDStream<T> |
queueStream(scala.collection.mutable.Queue<RDD<T>> queue,
boolean oneAtATime,
scala.reflect.ClassTag<T> evidence$12)
Create an input stream from a queue of RDDs.
|
<T> InputDStream<T> |
queueStream(scala.collection.mutable.Queue<RDD<T>> queue,
boolean oneAtATime,
RDD<T> defaultRDD,
scala.reflect.ClassTag<T> evidence$13)
Create an input stream from a queue of RDDs.
|
<T> ReceiverInputDStream<T> |
rawSocketStream(String hostname,
int port,
StorageLevel storageLevel,
scala.reflect.ClassTag<T> evidence$5)
Create a input stream from network source hostname:port, where data is received
as serialized blocks (serialized using the Spark's serializer) that can be directly
pushed into the block manager without deserializing them.
|
<T> ReceiverInputDStream<T> |
receiverStream(Receiver<T> receiver,
scala.reflect.ClassTag<T> evidence$2)
Create an input stream with any arbitrary user implemented receiver.
|
void |
remember(Duration duration)
Set each DStreams in this context to remember RDDs it generated in the last given duration.
|
SparkContext |
sc() |
org.apache.spark.streaming.scheduler.JobScheduler |
scheduler() |
<T> ReceiverInputDStream<T> |
socketStream(String hostname,
int port,
scala.Function1<java.io.InputStream,scala.collection.Iterator<T>> converter,
StorageLevel storageLevel,
scala.reflect.ClassTag<T> evidence$4)
Create a input stream from TCP source hostname:port.
|
ReceiverInputDStream<String> |
socketTextStream(String hostname,
int port,
StorageLevel storageLevel)
Create a input stream from TCP source hostname:port.
|
SparkContext |
sparkContext()
Return the associated Spark context
|
void |
start()
Start the execution of the streams.
|
scala.Enumeration.Value |
state() |
void |
stop(boolean stopSparkContext)
Stop the execution of the streams immediately (does not wait for all received data
to be processed).
|
void |
stop(boolean stopSparkContext,
boolean stopGracefully)
Stop the execution of the streams, with option of ensuring all received data
has been processed.
|
org.apache.spark.streaming.StreamingContext.StreamingContextState$ |
StreamingContextState()
Accessor for nested Scala object
|
DStream<String> |
textFileStream(String directory)
Create a input stream that monitors a Hadoop-compatible filesystem
for new files and reads them as text files (using key as LongWritable, value
as Text and input format as TextInputFormat).
|
static <K,V> PairDStreamFunctions<K,V> |
toPairDStreamFunctions(DStream<scala.Tuple2<K,V>> stream,
scala.reflect.ClassTag<K> kt,
scala.reflect.ClassTag<V> vt,
scala.math.Ordering<K> ord) |
<T> DStream<T> |
transform(scala.collection.Seq<DStream<?>> dstreams,
scala.Function2<scala.collection.Seq<RDD<?>>,Time,RDD<T>> transformFunc,
scala.reflect.ClassTag<T> evidence$15)
Create a new DStream in which each RDD is generated by applying a function on RDDs of
the DStreams.
|
org.apache.spark.streaming.ui.StreamingTab |
uiTab() |
<T> DStream<T> |
union(scala.collection.Seq<DStream<T>> streams,
scala.reflect.ClassTag<T> evidence$14)
Create a unified DStream from multiple DStreams of the same type and same slide duration.
|
org.apache.spark.streaming.ContextWaiter |
waiter() |
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
initialized, initializeIfNecessary, initializeLogging, initLock, isTraceEnabled, log_, log, logDebug, logDebug, logError, logError, logInfo, logInfo, logTrace, logTrace, logWarning, logWarning
public StreamingContext(SparkContext sparkContext, Duration batchDuration)
sparkContext
- existing SparkContextbatchDuration
- the time interval at which streaming data will be divided into batchespublic StreamingContext(SparkConf conf, Duration batchDuration)
conf
- a org.apache.spark.SparkConf object specifying Spark parametersbatchDuration
- the time interval at which streaming data will be divided into batchespublic StreamingContext(String master, String appName, Duration batchDuration, String sparkHome, scala.collection.Seq<String> jars, scala.collection.Map<String,String> environment)
master
- cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).appName
- a name for your job, to display on the cluster web UIbatchDuration
- the time interval at which streaming data will be divided into batchespublic StreamingContext(String path, org.apache.hadoop.conf.Configuration hadoopConf)
path
- Path to the directory that was specified as the checkpoint directoryhadoopConf
- Optional, configuration object if necessary for reading from
HDFS compatible filesystemspublic static int DEFAULT_CLEANER_TTL()
public static <K,V> PairDStreamFunctions<K,V> toPairDStreamFunctions(DStream<scala.Tuple2<K,V>> stream, scala.reflect.ClassTag<K> kt, scala.reflect.ClassTag<V> vt, scala.math.Ordering<K> ord)
public static StreamingContext getOrCreate(String checkpointPath, scala.Function0<StreamingContext> creatingFunc, org.apache.hadoop.conf.Configuration hadoopConf, boolean createOnError)
checkpointPath
, then StreamingContext will be
recreated from the checkpoint data. If the data does not exist, then the StreamingContext
will be created by called the provided creatingFunc
.
checkpointPath
- Checkpoint directory used in an earlier StreamingContext programcreatingFunc
- Function to create a new StreamingContexthadoopConf
- Optional Hadoop configuration if necessary for reading from the
file systemcreateOnError
- Optional, whether to create a new StreamingContext if there is an
error in reading checkpoint data. By default, an exception will be
thrown on error.public static scala.Option<String> jarOfClass(Class<?> cls)
public boolean isCheckpointPresent()
public SparkContext sc()
public SparkConf conf()
public SparkEnv env()
public org.apache.spark.streaming.DStreamGraph graph()
public String checkpointDir()
public Duration checkpointDuration()
public org.apache.spark.streaming.scheduler.JobScheduler scheduler()
public org.apache.spark.streaming.ContextWaiter waiter()
public org.apache.spark.streaming.ui.StreamingTab uiTab()
public org.apache.spark.streaming.StreamingContext.StreamingContextState$ StreamingContextState()
public scala.Enumeration.Value state()
public SparkContext sparkContext()
public void remember(Duration duration)
duration
- Minimum duration that each DStream should remember its RDDspublic void checkpoint(String directory)
directory
- HDFS-compatible directory where the checkpoint data will be reliably stored.
Note that this must be a fault-tolerant file system like HDFS forpublic <T> ReceiverInputDStream<T> networkStream(Receiver<T> receiver, scala.reflect.ClassTag<T> evidence$1)
receiver
- Custom implementation of Receiverpublic <T> ReceiverInputDStream<T> receiverStream(Receiver<T> receiver, scala.reflect.ClassTag<T> evidence$2)
receiver
- Custom implementation of Receiverpublic <T> ReceiverInputDStream<T> actorStream(akka.actor.Props props, String name, StorageLevel storageLevel, akka.actor.SupervisorStrategy supervisorStrategy, scala.reflect.ClassTag<T> evidence$3)
props
- Props object defining creation of the actorname
- Name of the actorstorageLevel
- RDD storage level. Defaults to memory-only.
public ReceiverInputDStream<String> socketTextStream(String hostname, int port, StorageLevel storageLevel)
\n
delimited
lines.hostname
- Hostname to connect to for receiving dataport
- Port to connect to for receiving datastorageLevel
- Storage level to use for storing the received objects
(default: StorageLevel.MEMORY_AND_DISK_SER_2)public <T> ReceiverInputDStream<T> socketStream(String hostname, int port, scala.Function1<java.io.InputStream,scala.collection.Iterator<T>> converter, StorageLevel storageLevel, scala.reflect.ClassTag<T> evidence$4)
hostname
- Hostname to connect to for receiving dataport
- Port to connect to for receiving dataconverter
- Function to convert the byte stream to objectsstorageLevel
- Storage level to use for storing the received objectspublic <T> ReceiverInputDStream<T> rawSocketStream(String hostname, int port, StorageLevel storageLevel, scala.reflect.ClassTag<T> evidence$5)
hostname
- Hostname to connect to for receiving dataport
- Port to connect to for receiving datastorageLevel
- Storage level to use for storing the received objects
(default: StorageLevel.MEMORY_AND_DISK_SER_2)public <K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> InputDStream<scala.Tuple2<K,V>> fileStream(String directory, scala.reflect.ClassTag<K> evidence$6, scala.reflect.ClassTag<V> evidence$7, scala.reflect.ClassTag<F> evidence$8)
directory
- HDFS directory to monitor for new filepublic <K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> InputDStream<scala.Tuple2<K,V>> fileStream(String directory, scala.Function1<org.apache.hadoop.fs.Path,Object> filter, boolean newFilesOnly, scala.reflect.ClassTag<K> evidence$9, scala.reflect.ClassTag<V> evidence$10, scala.reflect.ClassTag<F> evidence$11)
directory
- HDFS directory to monitor for new filefilter
- Function to filter paths to processnewFilesOnly
- Should process only new files and ignore existing files in the directorypublic DStream<String> textFileStream(String directory)
directory
- HDFS directory to monitor for new filepublic <T> InputDStream<T> queueStream(scala.collection.mutable.Queue<RDD<T>> queue, boolean oneAtATime, scala.reflect.ClassTag<T> evidence$12)
queue
- Queue of RDDsoneAtATime
- Whether only one RDD should be consumed from the queue in every intervalpublic <T> InputDStream<T> queueStream(scala.collection.mutable.Queue<RDD<T>> queue, boolean oneAtATime, RDD<T> defaultRDD, scala.reflect.ClassTag<T> evidence$13)
queue
- Queue of RDDsoneAtATime
- Whether only one RDD should be consumed from the queue in every intervaldefaultRDD
- Default RDD is returned by the DStream when the queue is empty.
Set as null if no RDD should be returned when emptypublic <T> DStream<T> union(scala.collection.Seq<DStream<T>> streams, scala.reflect.ClassTag<T> evidence$14)
public <T> DStream<T> transform(scala.collection.Seq<DStream<?>> dstreams, scala.Function2<scala.collection.Seq<RDD<?>>,Time,RDD<T>> transformFunc, scala.reflect.ClassTag<T> evidence$15)
public void addStreamingListener(StreamingListener streamingListener)
StreamingListener
object for
receiving system events related to streaming.public void start()
public void awaitTermination()
public void awaitTermination(long timeout)
timeout
- time to wait in millisecondspublic void stop(boolean stopSparkContext)
stopSparkContext
- Stop the associated SparkContext or not
public void stop(boolean stopSparkContext, boolean stopGracefully)
stopSparkContext
- Stop the associated SparkContext or notstopGracefully
- Stop gracefully by waiting for the processing of all
received data to be completed