"entirely belongs to" shareholding
Classified in Computers
Written at on English with a size of 18.95 KB.
DATA STREAM- ds are unbounded sequences of time-varying data elements. Represent a continuous flow of information. With the recent info being more relevant as it describes the current state of a dynamic system. DBMS VS DATA STREAMING- A database stores the data before it can be queried. Several or different queries on the same data can be run many times. Data Streaming process data on the fly without storing the data, as soon as data arrives to the system. Queries are continuous (they can run forever) and a query cannot be executed more than once on the same data. CEP ENGINES FAM- Two main dimensions: -Abstraction: The queries are programmed and they use connecting algebraic operators such as map, filter, aggregate,...). SQL-like language. -Parallelization: Is centralized and has inter-query parallelism. There are 2 types of intra query parallelsim:-Inter-operator parallelism in which each operator can be run on a different node. Scales with the number of query operators. -Intra-operator parallelism in which each operator can be run on a cluster of nodes. It scales with event stream volume. TYPES OF WINDOWS- Tumbling Windows: assign each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap.. Sliding Windows: the size of the windows is configured by the window size parameter. An additional window slide parameter controls how frequently a sliding window is started. Hence, sliding windows can be overlapping if the slide is smaller than the window size. In this case a tuple may be assigned to multiple windows. Session Windows: assigner groups elements by sessions of activity. Session windows do not overlap and do not have a fixed start and end time, in contrast to tumbling windows and sliding windows. Instead, a session window closes when it does not receive elements for a certain period of time. Global Windows: global windows assigner assigns all elements with the same key to the same single global window. This windowing scheme is only useful if you also specify a custom trigger. Otherwise, no computation will be performed, as the global window does not have a natural end at which we could process the aggregated elements. TYPES OF TIME ASSOCIATED TO EVENTS IN FLINK-There are three notions of time: EventTime, ProcessingTime and IngestionTime. EventTime is the time when the event was created in its producing device. ProcessingTime is the time of the machine that is executing an operation. IngestionTime is the time assigned to the event when it enters the Flink data flow keyedStream.Window(EventTime SessionWindows.WithGap(Time.Seconds(3))).Apply( new SimpleSum()); -> execute apply function when window doent recive data in 3 secs
KEYBY-KeyBy partitions a stream into disjoint streams based on the values of one or more fields in the stream tuples. Each substream can be processed independently by different instances of the next operator (in the same or different node). FLATMAT AND EXAMPLE-The FlatMap function takes elements and transforms them, into zero, one, or more elements. Example that splits sentences (value) to words (word). Per each, value (sentence, input tuple), zero or more words cand be produced. DataStream.FlatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { for(String word: value.Split(" ")){ out.Collect(word); } } }); sensor1Stream.Join(sensor2Stream). Where((value) -> { return value.F0; }}). EqualTo((value) -> { return value.F0; }}). Window(TumblingProcessingTimeWindows.Of(Time.Seconds(1))). Apply((JoinFunction) (first, second) -> { return new Tuple5(first.F0, first.F1, first.F2, second.F1, second.F2); }).Filter((in) -> Events coming from two different streams are joined taking into account the value.F0. Each second a new tuple is generated as a result of the join. The first field is the timestamp (long) from the tuple in the first stream and that is equal to the one in the tuple of the second stream. Fields 2 and 3 are the ones from the tuple in the first stream and fields 4 and 5 from the second stream. Finally, each new event is filtered. MAP- take 1 elem and produces 1 elem. FILTER-Evaluates a predicate for each element and outputs those for which the predicate is true. TUPLES-Tuples encapsulate data.There are 25 predefined tuples: Tuple1, Tuple2, … Tuple 25 with 1, 2,… 25 fields. KEYBY- Some operations require the data is grouped by a key before the operation is applied. Expensive in a distributed setting. REDUCE- Combines the current element with the last reduced value and emits the new value. KeyedStream.Reduce(new ReduceFunction() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } AGGREGATIONS- Rolling aggregations(min, max, sum) on a keyed data stream. WINDOWS- Windows split the stream into “buckets” of finite size on which a computation is applied. WIND CONFGS- Trigger- determines when a window is ready to be processed by the window function. There are several default triggers. (activa la funcion cuando pasan 100 eventos-window().Tigger(CountTrigger.Of(100)) Evictor- ability to remove elements from a window after the trigger fires and before and/or after the window function is applied. (retiene 10 elem en al ventana-window().Trigger().Evictor(CountEvictor.Of(10))Lateness- Used to manage late events. By default, late events are dropped.
WIND PROCESSING- Based on the current system time of the machine. 3 Types: TumblingProcessingTimeWindows, SlidingProcessingTimeWindows, ProcessingTimeSessionWindows. Most efficient type of window. Non-deterministic: depends on the arrival rate of events WIND EVEN TIME- Based on the timestamp of the tuples. They are generated at the source of the data. Progress of time depends on the events, not on the clock (waiting for events). Watermarks indicate that there no more events for a given window. Timestamp of events is in milliseconds. Types: TumblingEventTimeWindows, SlidingEventTimeWindows, EventTimeSessionWindows WATERMARKS- Watermarks carry timestamps that indicate that there no more events for a given window. They flow through the stream. Once a watermark reaches an operator, time advances and a new watermark is generated. If the operator receives more than one input stream, the minimum watermark is taken. Watermarks can be defined in the stream source: Timestamps: collectWithTimestamp method Watermarks: emitWatermark(Watermark. Assigners take a stream and produce a timestamped stream. -AssignerWithPeriodicWatermarks: Emits watermarks periodically (ms) -AssignerWithPunctuatedWatermarks: Emits a watermark based on some property of the incoming events. -stream.AssignTimestampsAndWatermarks: Timestamps are generated in ascending order and are used as watermark. JOBMANAGER-coordinate the distributed execution. TASKMANAGER-execute the tasks. TaskManagers connect to JobManagers, announcing themselves as available, and are assigned work Each TaskManager is a JVM process, and may execute one or more subtasks in separate threads. To control how many tasks a worker accepts, a worker has task slots (at least one). Each task slot represents a fixed subset of resources of the TaskManager. KEYED VS NON KEYED WIN-Using keyBy() the stream will be splitted into logical keyed streams. If keyBy() is not called the stream is not keyed. In the case of keyed streams, any attribute of incoming events can be used as a key. Having a keyed stream will allow windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processed independently from the rest. All elements referring to the same key will be sent to the same parallel task. In case of non-keyed streams, the original stream will not be splitted into multiple logical streams and all the windowing logic will be performed by a single task. The window assigner for keyed stremas is window() and windowAll() for non-keyed streams
FLINK WIN CODE KEYED- TUMBLING (sin solapar) - keyedStream. TimeWindow(Time.Seconds(5)): that elements are grouped according to their timestamp in groups of 5 second duration. Every elem 1 wind keyedStream.CountWindow(1000):elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window. SLIDING (con solapa)- keyedStream.TimeWindow(Time.Seconds(5), Time.Seconds(1)): elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at most 4 seconds) keyedStream. CountWindow(1000,100): elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at most 900 elements). GLOBAL (all elems in one)-stream.Window(GlobalWindows.Create()): All incoming elems of a given key are assigend to the same window.stream.Window(TumblingEvent/ProcessingTimeWindows.Of(Time.Seconds(1))): Incoming elements are assigned to a window of a certain size (1 second below) based on their timestamp. Windows do not overlap, i.E., each element is assigned to exactly one window. stream.Window(SlidingEvent/SlidingProcessingTimeWindows.Of(Time.Seconds(5), Time. Seconds(1))): Incoming elements are assigned to a window of a certain size (5 seconds below) based on their timestamp. Windows "slide" by the provided value (1 second in the example), and hence overlap. FLINK WIN CODE UNKEYED- TUMBLING- nonKeyedStream.TimeWindowAll(Time.Seconds(5)):elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. nonKeyedStream.CountWindowAll(1000):elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window. SLIDING- nonKeyedStream.TimeWindowAll(Time.Seconds(5), Time.Seconds(1)):elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) KEYED AND UNKEYED- SESS WIND (no more prod added in next 10 secs since last prod)- stream.Window(Processing/ EventTimeSessionWindows.WithGap(Time.Minutes(5)): a session is a period of activity that is preceded and followed by a period of inactivity. This wind use a timeout that specifies how long we want ot wait until we believe that a session has ended
Temp single measure > 50 - public static class JulyExam{ | final ParameterTool params=ParameterTool.FromArgs(args) | final StreamExecutionEnvironmentenv=StreamExecutionEnvironment.GetExecutionEnvironment() | Data Streamtext=env.ReadTextFile(params.Get("input") | env.GetConfig().SetGlobalJobParameters(params) | SingleOutputStreamOperator<>>mapStreamA=text.Map(new MapFunction<>>(){ | public Tuple3map(String s) throws Exception{ | String fieldArray=s.Split{",") | return new Tuple3(Long.ParseLong(fieldArray[0]),fieldArray[1],Double.ParseDouble(fieldArray[2])) | } | }); ///////,string,double>,string,double>,string,double>,string,double>,string,double>
SingleOutputStreamOperator<>>filterStreamA=mapStreamA.Filter(new FilterFunction<>>(){ | public boolean filter(Tuple3in)throws Exception{ | return in.F0 > 50; ////////////////////,string,double>,string,double>,string,double>,string,double>,string,double>,string,double>
KeyedStream<>,Tuple>keyedStream=mapStreamA.KeyBy(0,1,2),string,double>,integer,integer> //////////////////////
SingleOutputStreamOperator<>>sumProducts=keyedStream.Window(ProcessingTimeSessionWindow.WithGap(Time.Seconds(10))).Apply(new JulyExam.SumProduct()),integer,integer,integer>,integer,integer,integer> //////////////////////
public static class SumProduct implements WindowFunction<>,Tuple3,Tuple,TimeWindow>{,integer,integer>,integer,integer>,integer,integer,integer>,integer,integer,integer,integer> | public void apply(Tuple tuple, TimeWindow tw, Itereable<>> input, Collector<>>out)throws Exception{,>,integer,integer,integer>,integer,integer>,integer,integer> | Iterator<>>iterator=input.Iterator(),integer,integer>,integer,integer> | Tuple3first=iterator.Next(),integer,integer> | Long ts=0L | Integer product,customer,quantity=0 | if(first!=null){ | ts=first.F0 | product=first.F1 | customer=first.F2 | quantity=1 | } | while(iterator.HasNext()){ | Tuple3next=iterator.Next(),integer,integer>,integer,integer> | quantity+=1 | } | out.Collect(new Tuple4(ts,product,customer,quantity)),integer,integer,integer>,integer,integer,integer> | } | } | } //////////////
SingleOutputStreamOperator<>> alarm=sumProducts.Filter(new FilterFunction<>>() { | public boolean filter(Tuple4 in) throws Exception { | return in.F3 > 9;}});,>,>,...>,integer,integer,integer>,integer,...>
// emit result
if (params.Has("output")) {
alarm.WriteAsCSV(params.Get("output"));
}
// execute program
env.Execute("CustomerPurchase");
}
.AssignTimestampsAndWatermarks(
new AscendingTimestampExtractor<><>Integer>>() {
@Override
public long extractAscendingTimestamp(Tuple4<> element) {
Integer newTimeStamp = element.F0 * 1000;
return newTimeStamp;
},>,>
});