IBM InfoSphere Streams Version 4.1.0

Operator BloomFilter

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.ibm.streams.teda/op$com.ibm.streams.teda.utility$BloomFilter.svg

The BloomFilter operator detects duplicate tuples in a memory-efficient way.

Although it is a detector and not a filter, the name BloomFilter is maintained to indicate that the underlying mathematics belong to the well-known Bloom filter algorithm.

The BloomFilter operator neither suppresses duplicate tuples, nor splits the input stream into unique and duplicate output streams. It does, though, flag a tuple in the output stream to indicate that it is either unique or duplicate.

The BloomFilter operator is used in scenarios that require duplicate detection for millions of unique tuples over a period of days.

The advantage of the BloomFilter operator over the DeDuplicate operator is a smaller memory footprint.

False positives are an occasional side effect of compression and occur when a tuple is marked as a duplicate even though it is unique. You can configure this operator with the number of expected unique tuples (N) and the probability of false positives (P). If you have a high number of expected unique tuples and a low probability, you need a higher amount of required memory.

The following formula calculates the required memory, where M is - N * ln(P) / ln(2)^2:

bytes = 2^ceil(log2(ceil(M/8)))

For example, with 1,000,000,000 (1E+9) expected unique tuples (N) and a maximum probability value of 1E-14 (P) for false positives, 8 GB memory is required.

The BloomFilter requires a hash value as part of each input tuple. These hash values are used to detect duplicate tuples. The hash value must be calculated outside of the BloomFilter by using either the recommended com.ibm.streams.teda.utility::sha2hashXXX() functions, for example com.ibm.streams.teda.utility::sha2hash160(rstring), or another hash function that provides a hexadecimal string representation of sufficient length for the hash value.

During the startup phase of the BloomFilter operator, the size of the required memory and other useful information is printed to the standard output (stdout) of the processing element. It is recommended that you review this information, which also contains a recommendation for how to calculate the hash, as well as the hashAttribute parameter documentation.

In the Streams Explorer in the InfoSphere® Streams Studio, right-click a job, and select Get Job Logs. Alternatively, you can use the streamtool getlog or streamtool viewlog command to view the log files.

Example

- number of expected uniques (N): 1000
- false positive probability (P): 0.100000000
- bit array (bits, configured)  : 4793
- bit array (bits, reality) (M) : 8192
- bit array (bytes, reality)    : 1024 (=0.0 MB)
- bit array (address bits)      : 13
- number of hash functions (K)  : 2 (down from: 6)
- hash width per hash function  : 4 (characters)
- exact probability for N and K : 0.046925323
- minimum/used hash bits        : 32 (=8 characters)
- recommended hash bits         : 160 (=40 characters)
- recommended hash assignment   : sha2hash160(key)

If the number of stored unique tuples (metric nUniquesStored) reaches the number of configured unique tuples (metric nUniquesConfigured), this operator is considered to be full (100%). The following formula calculates the fill level:

fill level = (nUniquesStored / nUniquesConfigured) * 100

You can configure the operator to send event tuples when the fill level reaches certain thresholds. If the fill level increases to a value above 100%, the probability for false positive dramatically increases. Therefore, it is important that the application administrator is aware of the considerations around high fill levels. The events can be used for alarming.

You can ensure that the fill level of 100% is not reached by selecting one of two possible strategies, or you can combine both strategies:

  1. The BloomFilter operator supports partitioning and automatic eviction of partitions. See the partitionBy, partitionCount, and searchAllPartitions parameters for more details. This function is available in InfoSphere® Streams Version 4.1 and later.
  2. You use the optional command port to clear, write, and read the state of the BloomFilter operator, which you can trigger as needed. This functionality is called controlled checkpointing. The following paragraph sketches how to evict too old data from the BloomFilter operator.

Sketched Eviction Approach Outside Of The BloomFilter Operator

If you want to evict too old or unneeded data from the BloomFilter operator, complete the following steps:

  • Store all hash values and time information in a database or file system during normal processing (unique and duplicate tuple detection). A database or file system is required because the hash values require a huge amount of memory.
  • Remove old or unneeded data from the BloomFilter.
    1. Stop the flow of data to the data input port.
    2. To reset the operator, send the clear command to the command input port.
    3. Send all valid hash values, which are stored in a database or in the file system, to the operator to train it with the reduced set of hash values.
    4. Enable the normal data flow.

Exceptions

Under the following conditions, the BloomFilter raises SPL runtime exceptions during the startup phase:

  • The numberOfExpectedUniques parameter is 0.
  • The probability parameter is a negative value, a value greater than or equal to 1, or 0.
  • The partitionCount parameter is 0.
  • The address range requires more than 64 address bits.
  • The BloomFilter operator requires more memory than is available.

If the faultHandling parameter is set to strict, the BloomFilter raises an SPL runtime exception while it processes tuples under the following conditions:

  • The received hash value is too short.
  • The received hash value contains invalid characters.

Behavior in a consistent region and checkpointing

The BloomFilter operator can be an operator within the reachability graph of a consistent region. It cannot be the start of a consistent region.

The BloomFilter operator also supports periodic checkpointing that is enabled with the checkpoint configuration clause, for example:

config
	checkpoint : periodic(5.0);
	restartable : true;

The BloomFilter operator has a potential large operator state. Checkpointing a large operator state incurs significant overheads to the streaming application and the backend checkpointing storage. The application needs to spend substantial amounts of time to serialize and write the state to the persistent storage, which stalls normal processing. Furthermore, the checkpointed large state data consumes huge storage space and I/O bandwidth of the backend checkpointing store. Ensure that your application is tested with respect to checkpointing and recovery before going into production. For a large operator state, you might want to increase the drainTimeout and resetTimeout parameters in the @consistent annotation.

During a reset the BloomFilter operator does not send events. Thus the fill level can change without being signaled during the reset.

Summary

Ports
This operator has 2 input ports and 2 output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 10 parameters.

Required: numberOfExpectedUniques, probability, hashAttribute

Optional: faultHandling, commandAttribute, argumentAttribute, alarmThresholds, partitionBy, partitionCount, searchAllPartitions

Metrics
This operator reports 4 metrics.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Input Ports

Ports (0)

The first input port is the data input port. It receives tuples and verifies whether they are unique or duplicate tuples. Tuples that arrive on this port result in output tuples sent to the first output port.

Any input schema is allowed.

The data input port must have an rstring attribute with a hash value (hexadecimal string). This hash value determines whether the tuple is unique or a duplicate. For more information, see the hashAttribute parameter.

Input attributes are automatically forwarded to the data output port, which is the first output port.

If an incoming data tuple is unique, the unique tuple count (see the nUniquesStored metric) increases. If alarm thresholds are configured, the fill level is calculated. If the fill level exceeds an alarm threshold that has not already sent an event, an event tuple is sent to the status output port.

Properties

Ports (1)

The second input port is the command input port. Command tuples on this port lead to response tuples on the status output port. Commands that are sent to this port control the BloomFilter’s internal state.

If the command input port exists, the status output port and the commandAttribute parameter are mandatory.

This port accepts all input schemas.

The command input port must have an rstring attribute with a value, also known as the command string, (see the commandAttribute parameter) and an optional rstring attribute that specifies the command argument (see the argumentAttribute parameter). If the argument attribute is omitted, the read and write commands are not supported.

Supported commands are

  • status - Receives the current status of the BloomFilter
  • clear - Resets the BloomFilter's internal state
  • read - Read the internal state of the BloomFilter from a file (see argument below)
  • write - Writes the internal state of the BloomFilter to a file (see argument below)

The following commands require an argument:

  • read: The rstring input attribute, which is specified with the argumentAttribute parameter, holds a filename. If the file name is a relative path, the path is relative to the data directory.
  • write: The rstring input attribute, which is specified with the argumentAttribute parameter, holds a filename. If the file name is a relative path, the path is relative to the data directory.

The following error conditions can occur:

  • If either the read or write command is received without a valid file name (argumentAttribute not set, empty filename provided, provided filename does not point to a file), the error condition is traced, and the command fails.
  • If either the read or write command is received and any I/O problem occurs, for example unable to open a file, writing a protected file, file does not exist, or failing to create the file, the error condition and further details are traced, and the command fails.
  • If the read command is run but the base header cannot be read, an invalid header id or header version is determined, or the header extension cannot be read, the error conditions and further details are traced, and the command fails.
  • If the read command is run but the file does not fit to the current configuration with the numberOfExpectedUniques and probability parameters, the error conditions and further details are traced, and the command fails.
  • If the read command is run but the an invalid header with a wrong partition count or a wrong bit array size is determined, the error conditions and further details are traced, and the command fails.
  • If the read command cannot read the size or content of the partition information, the command fails and the error condition is traced.
  • If the read command cannot read the memory block, the command fails, the error condition is traced, and the operator state is reset. This reset means that unique tuples are not stored.
  • If the write command is run but the base header or the header extension cannot be written, the error conditions and further details are traced, and the command fails.
  • If the write command is run but the size or content of the partition information cannot be written, the error conditions and further details are traced, and the command fails.
  • If the write command cannot write the memory block, the command fails and the error condition is traced.
  • If the write command attempts to access the data directory, but the data directory has not been specified, the command fails and the error condition is traced.
  • If the clear command detects an invalid number of expected unique tuples or an invalid probability in a checkpoint, the error conditions and further details are traced, and the operator aborts.
  • If the clear command detects an invalid bit array size or an invalid number of bytes for the bit array in a checkpoint, the error conditions and further details are traced, and the operator aborts.
  • If the clear command detects an invalid version in a checkpoint, the error conditions and further details are traced, and the operator aborts.
  • If the command is not recognized, the error condition is traced, and the command fails.

If the thresholds are used and a read command results in a changed current threshold then an event is generated indicating the new threshold.

Input attributes are automatically forwarded to the status output port, which is the second output port.

Properties

Output Ports

Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes. Attributes not assigned in the output clause will be automatically assigned from the attributes of the input ports that have the same name and type. If there is no such input attribute, an error is reported at compile-time.
Output Functions
DataCustomOutputFunctions
<any T> T AsIs(T value)

Returns the original argument.

boolean Duplicate()

Returns true if the tuple is a duplicate or false positive. This function is the opposite of the Unique() function.

boolean Unique()

Returns true if the tuple is unique. This function is the opposite of the Duplicate() function.

enum { unknown, unique, duplicate } Result()

Returns the result of the BloomFilter operation:

  • The tuple is evaluated to be unique.
  • The tuple is evaluated to be a duplicate.
  • The tuple cannot be evaluated and therefore, the result is unknown.

The unknown result occurs only if partitioning is used: The partitionBy expression is evaluated for the tuple. If the value indicates that a partition needs to be created that would be evicted immediately, then an evaluation is impossible. If this custom output function is used, the tuple leaves the BloomFilter operator, else the tuple is dropped and the nDropped metric is increased.

This function combines the results of the Unique() and the Duplicate() functions.

ControlCustomOutputFunctions
<any T> T AsIs(T value)

Returns the original argument.

enum { response, event } Kind()

Returns response if a command was run or event if an alarm threshold was crossed. For more information, see the BloomFilterTypes.Kind SPL type.

boolean Succeeded()

Returns true if the command was run successfully. This function is the opposite of the Failed() function.

boolean Failed()

Returns true if the command fails. This function is the opposite of the Succeeded() function.

float64 Threshold()

Returns the active threshold value.

float64 FillLevel()

Returns the ratio (as a percentage) between the stored and configured unique tuples.

If partitioning is enabled, it returns the fill level of the partition that has the highest fill level.

uint64 UniqueTupleCount()

Returns the number of stored unique tuples.

If partitioning is enabled, it returns the number of stored unique tuples of the partition that has the highest number of stored unique tuples.

uint64 DroppedTupleCount()

Returns the number of dropped tuples. Tuples are dropped if they belong to a partition that is already evicted or that would be evicted immediately after its creation.

The function returns 0 if partitioning is disabled.

Ports (0)

The first output port is the data output port.

For each data input tuple, a corresponding data output tuple is sent to the data output port.

Properties

Ports (1)

The second output port is the status output port. If either the optional command input port exists or alarm thresholds are configured (see alarmThresholds parameter), this port is mandatory.

For each command input tuple, a corresponding response tuple is sent to the status output port.

For each data input tuple that generates a changed alarm threshold, an event tuple is sent to the status output port.

The following predefined SPL types can be used in any application:

Properties

Parameters

This operator supports 10 parameters.
faultHandling

Specifies how to handle faults. Valid values are strict, which means that a runtime problem (like a hash value that is too short) causes a runtime error and termination of the operator, and permissive, which means that a runtime problem results in runtime log entries, and the BloomFilter tries to continue to the next tuple. The default fault handling mode is strict.

Properties

numberOfExpectedUniques

Specifies the number of expected unique tuples for which the configured probability of false positives is guaranteed. If partitioning is used, each partition is able to hold this number of expected unique tuples.

A false positive occurs when a tuple is marked as a duplicate even though it is unique.

If the parameter value is 0, the BloomFilter raises an SPL runtime exception during the startup phase.

Properties

probability

Specifies the probability of false positives for the configured number of expected unique tuples.

A false positive occurs when a tuple is marked as a duplicate even though it is unique.

This parameter requires a value that is greater than zero and less than one. If you specify a value that is less than or equal to 0 or greater than or equal to 1, the BloomFilter raises an SPL runtime exception during the startup phase.

Properties

hashAttribute

Specifies the rstring attribute of the data input port that holds a hash value (hexadecimal string). The hash value can be calculated in an upstream operator with the com.ibm.streams.teda.utility::sha2hashXXX() functions, for example com.ibm.streams.teda.utility::sha2hash160(rstring).

During the startup phase of the BloomFilter operator, the size of the required memory and other useful information is is printed to the standard output (stdout) of the processing element. This information contains a recommendation for calculating the hash.

In the following example, the BloomFilter requires 32 hash bits. Since the operator knows the supported SHA-2 hash functions and their returned bit numbers, it recommends a hash assignment that provides a minimum but sufficient number of bits. In this case, it recommends the sha2hash160() function that returns 160 bits, which is five times more than necessary:

- minimum/used hash bits        : 32 (=8 characters)
- recommended hash bits         : 160 (=40 characters)
- recommended hash assignment   : sha2hash160(key)
Properties

commandAttribute

Specifies the rstring attribute of the optional command input port that holds the command that is described in “Input Ports.”

If you specify a value for this parameter, the optional command input and status output ports are mandatory. If the optional command input port exists, this parameter is mandatory.

Properties

argumentAttribute

Specifies the rstring attribute of the optional command input port that holds the command argument as described in "Input Ports.”

If this parameter is specified, the optional command input and status output ports are mandatory. If this parameter is omitted and the command input port exists, the read and write commands are not supported.

Properties

alarmThresholds

Specifies one or more fill level thresholds (as a percentage) that, if reached, generate an event. These events are used to observe the operator's fill level, for example to inform an administrator if the fill level reaches a critical value.

If a value is specified, the optional status output port is mandatory.

The BloomFilter operator counts how many unique tuples are stored. If the ratio between the stored and configured number of unique tuples exceeds one or more of the configured threshold values [%], an event tuple is sent to the status output port.

Negative alarm thresholds are silently dropped. The 0.0 alarm threshold is always present.

For example, if the fill level reaches 25%, 50%, 75%, and 100% or if the fill level drops to 0% (clear command), an event tuple is generated.

alarmThresholds: 25.0, 50.0, 75.0, 100.0;

If partitioning is enabled, each partition has its own fill level. For the determination whether an event tuple needs to be sent, the highest fill level is used.

Properties

partitionBy

Specifies an expression that is used to partition the input tuples, where all parameter configurations apply to the partitions, independently. For example, each partition holds as many unique tuples as specified with the numberOfExpectedUniques parameter.

The expression must return a sortable value, typically, a numeric or string value, for example, a date/time string. As soon as the number of active partitions exceeds the specified count, see the partitionCount parameter, the partition with the minimum expression value is evicted.

Following types are allowed for the partitionBy parameter:

  • int8, int16, int32, int64
  • uint8, uint16, uint32, uint64
  • float32, float64, decimal32, decimal64, decimal128
  • rstring, ustring
  • timestamp
Properties

partitionCount

Specifies the maximum number of partitions. As soon as the number of active partitions exceeds this count, the partition with the minimum expression value that is specified with the partitionBy parameter, is evicted.

Properties

searchAllPartitions

Specifies whether the unique/duplicate detection algorithm evaluates all partitions or only the partition that is selected with the partitionBy expression.

If set to true, the algorithm evaluates all partitions. If the tuple is evaluated to be a unique in the partition that is selected with the partitionBy expression, the number of stored uniques is increased for this partition even if the tuple is marked as duplicated because of another partition.

The probability of false positives increases the higher the fill level of a partition gets. For badly chosen values of partitionBy and numberOfExpectedUniques this can lead to more false positives compared to a configuration without partitions.

If set to false, the algorithm evaluates the partition that is selected with the partitionBy expression.

Properties

Code Templates

UseBloomFilter
use com.ibm.streams.teda.internal::BloomFilter;
use com.ibm.streams.teda.internal::BloomFilterTypes;
      

BloomFilter
(
	stream<I, tuple<boolean duplicate>> ${dataStream} as O;
	stream<C, tuple<BloomFilterTypes.Kind kind>, BloomFilterTypes.Response, BloomFilterTypes.Status, BloomFilterTypes.Event> ${statusStream} as S
) as DuplicateDetector = BloomFilter(${inputStream} as I; ${commandStream} as C) 
{
	param
		numberOfExpectedUniques : 1000000ul;
		probability : 0.0000001;
		// Input attributes.
		hashAttribute: ${hash};
		commandAttribute: ${command};
		argumentAttribute: ${argument};
		// Event relevant parameters.
		alarmThresholds: 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0;
	output
		O:
			${duplicate} = Duplicate(), // or: = !Unique()
		S:
			${kind} = Kind(),
			// BloomFilterTypes.Response
			${success} = Succeeded(), // or: = !Failed()
			// BloomFilterTypes.Status
			${fillLevel} = FillLevel(),
			${nUniques} = UniqueTupleCount();
			// BloomFilterTypes.Event
			${threshold} = Threshold(),
}
      

Metrics

memory - Gauge

The amount of memory (in bytes) that is consumed by the BloomFilter operator.

nUniquesConfigured - Gauge

The configured number of unique tuples for which the configured false positive probability is ensured.

nUniquesStored - Gauge

The number of stored unique tuples. If partitioning is enabled, it is the sum of stored unique tuples in all partitions.

nDropped - Gauge

The number of tuples that are dropped because they belong to an already evicted partition or to a partition that would be evicted immediately. This metric is active only if the partitionBy parameter is specified and the Result() custom output function is not used.