Mastering Apache Flink and HBase Data Operations
Classified in Computers
Written on in
English with a size of 4.37 KB
Identify High-Volume Customer Purchases in Flink
The following Java code utilizes Apache Flink to identify customers who purchase more than 10 units of a specific item within a session window.
public class CustomerPurchase {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile(params.get("input"));
env.getConfig().setGlobalJobParameters(params);
SingleOutputStreamOperator<Tuple3<Long, Integer, Integer>> mapStream = text.map(new MapFunction<String, Tuple3<Long, Integer, Integer>>() {
public Tuple3<Long, Integer, Integer> map(String in) throws Exception {
String[] fieldArray = in.split(",");
return new Tuple3<>(Long.parseLong(fieldArray[0]), Integer.parseInt(fieldArray[1]), Integer.parseInt(fieldArray[2]));
}
});
KeyedStream<Tuple3<Long, Integer, Integer>, Tuple> keyedStream = mapStream.keyBy(0, 1, 2);
SingleOutputStreamOperator<Tuple4<Long, Integer, Integer, Integer>> sumProducts = keyedStream
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.apply(new CustomerPurchase.SumProduct());
SingleOutputStreamOperator<Tuple4<Long, Integer, Integer, Integer>> alarm = sumProducts.filter(new FilterFunction<Tuple4<Long, Integer, Integer, Integer>>() {
public boolean filter(Tuple4<Long, Integer, Integer, Integer> in) throws Exception {
return in.f3 > 9;
}
});
if (params.has("output")) {
alarm.writeAsCSV(params.get("output"));
}
env.execute("CustomerPurchase");
}
public static class SumProduct implements WindowFunction<Tuple3<Long, Integer, Integer>, Tuple4<Long, Integer, Integer, Integer>, Tuple, TimeWindow> {
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple3<Long, Integer, Integer>> input, Collector<Tuple4<Long, Integer, Integer, Integer>> out) throws Exception {
Iterator<Tuple3<Long, Integer, Integer>> iterator = input.iterator();
Tuple3<Long, Integer, Integer> first = iterator.next();
Long ts = 0L; Integer product = 0; Integer customer = 0; Integer quantity = 0;
if (first != null) {
ts = first.f0; product = first.f1; customer = first.f2; quantity = 1;
}
while (iterator.hasNext()) {
iterator.next();
quantity += 1;
}
out.collect(new Tuple4<>(ts, product, customer, quantity));
}
}
}Calculating HFiles in HBase Region Servers
3. Consider an HBase Region Server hosting a table with two column families and two regions. How many HFiles are generated?
There are 4 HFiles in total. For each region and for each column family, an HFile stores the data. Since there are 2 regions and 2 column families, the calculation is: 2 regions × 2 column families = 4 HFiles.
HBase Table Creation and Data Distribution
The Customer table uses a composite key consisting of the fields C_W_ID, C_D_ID, and C_ID. It contains two column families: Personal_Data and Purchases_Data. Given a cluster with six computers, one of the computers is used to create the table and distribute the data as follows:
Create ‘Customer’, ‘Personal_Data’, ‘Purchases_Data’Split ‘Customer’, ‘5,1,1’Split ‘Customer’, ‘9,1,1’Split ‘Customer’, ‘13,1,1’Split ‘Customer’, ‘17,1,1’Move ‘reg[-5,1,1)’, ‘rs1’Move ‘reg[5,1,1-9,1,1)’, ‘rs2’Move ‘reg[9,1,1-13,1,1)’, ‘rs3’Move ‘reg[13,1,1-17,1,1)’, ‘rs4’Move ‘reg[17,1,1-)’, ‘rs5’