public class FileInputDStream<K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> extends InputDStream<scala.Tuple2<K,V>>
At each batch interval, the file system is queried for files in the given directory and detected new files are selected for that batch. In this case "new" means files that became visible to readers during that time period. Some extra care is needed to deal with the fact that files may become visible after they are created. For this purpose, this class remembers the information about the files selected in past batches for a certain duration (say, "remember window") as shown in the figure below.
|<----- remember window ----->| ignore threshold --->| |<--- current batch time |____.____.____.____.____.____| | | | | | | | ---------------------|----|----|----|----|----|----|-----------------------> Time |____|____|____|____|____|____| remembered batches
The trailing end of the window is the "ignore threshold" and all files whose mod times
are less than this threshold are assumed to have already been selected and are therefore
ignored. Files whose mod times are within the "remember window" are checked against files
that have already been selected. At a high level, this is how new files are identified in
each batch - files whose mod times are greater than the ignore threshold and
have not been considered within the remember window. See the documentation on the method
isNewFile
for more details.
This makes some assumptions from the underlying file system that the system is monitoring.
- The clock of the file system is assumed to synchronized with the clock of the machine running
the streaming app.
- If a file is to be visible in the directory listings, it must be visible within a certain
duration of the mod time of the file. This duration is the "remember window", which is set to
1 minute (see FileInputDStream.MIN_REMEMBER_DURATION
). Otherwise, the file will never be
selected as the mod time will be less than the ignore threshold when it becomes visible.
- Once a file is visible, the mod time cannot change. If it does due to appends, then the
processing semantics are undefined.
Modifier and Type | Class and Description |
---|---|
class |
FileInputDStream.FileInputDStreamCheckpointData
A custom version of the DStreamCheckpointData that stores names of
Hadoop files as checkpoint data.
|
Constructor and Description |
---|
FileInputDStream(StreamingContext ssc_,
String directory,
scala.Function1<org.apache.hadoop.fs.Path,Object> filter,
boolean newFilesOnly,
scala.reflect.ClassTag<K> evidence$1,
scala.reflect.ClassTag<V> evidence$2,
scala.reflect.ClassTag<F> evidence$3) |
Modifier and Type | Method and Description |
---|---|
scala.collection.mutable.HashMap<Time,String[]> |
batchTimeToSelectedFiles() |
static int |
calculateNumBatchesToRemember(Duration batchDuration)
Calculate the number of last batches to remember, such that all the files selected in
at least last MIN_REMEMBER_DURATION duration can be remembered.
|
scala.Option<RDD<scala.Tuple2<K,V>>> |
compute(Time validTime)
Finds the files that were modified since the last time this method was called and makes
a union RDD out of them.
|
static boolean |
defaultFilter(org.apache.hadoop.fs.Path path) |
void |
start()
Method called to start receiving data.
|
void |
stop()
Method called to stop receiving data.
|
dependencies, isTimeValid, lastValidTime, slideDuration
cache, checkpoint, checkpointDuration, clearCheckpointData, context, count, countByValue, countByValueAndWindow, countByWindow, creationSite, filter, flatMap, foreach, foreach, foreachRDD, foreachRDD, generatedRDDs, generateJob, getCreationSite, getOrCompute, glom, graph, initialize, isInitialized, map, mapPartitions, mustCheckpoint, parentRememberDuration, persist, persist, print, reduce, reduceByWindow, reduceByWindow, register, remember, rememberDuration, repartition, restoreCheckpointData, saveAsObjectFiles, saveAsTextFiles, setContext, setGraph, slice, slice, ssc, storageLevel, transform, transform, transformWith, transformWith, union, updateCheckpointData, validate, window, window, zeroTime
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
initializeIfNecessary, initializeLogging, isTraceEnabled, log_, log, logDebug, logDebug, logError, logError, logInfo, logInfo, logName, logTrace, logTrace, logWarning, logWarning
public FileInputDStream(StreamingContext ssc_, String directory, scala.Function1<org.apache.hadoop.fs.Path,Object> filter, boolean newFilesOnly, scala.reflect.ClassTag<K> evidence$1, scala.reflect.ClassTag<V> evidence$2, scala.reflect.ClassTag<F> evidence$3)
public static boolean defaultFilter(org.apache.hadoop.fs.Path path)
public static int calculateNumBatchesToRemember(Duration batchDuration)
public scala.collection.mutable.HashMap<Time,String[]> batchTimeToSelectedFiles()
public void start()
InputDStream
start
in class InputDStream<scala.Tuple2<K,V>>
public void stop()
InputDStream
stop
in class InputDStream<scala.Tuple2<K,V>>
public scala.Option<RDD<scala.Tuple2<K,V>>> compute(Time validTime)