Mastering Apache Flink and HBase Data Operations

Classified in Computers

Written on in English with a size of 4.37 KB

Identify High-Volume Customer Purchases in Flink

The following Java code utilizes Apache Flink to identify customers who purchase more than 10 units of a specific item within a session window.

public class CustomerPurchase {
    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> text = env.readTextFile(params.get("input"));
        env.getConfig().setGlobalJobParameters(params);

        SingleOutputStreamOperator<Tuple3<Long, Integer, Integer>> mapStream = text.map(new MapFunction<String, Tuple3<Long, Integer, Integer>>() {
            public Tuple3<Long, Integer, Integer> map(String in) throws Exception {
                String[] fieldArray = in.split(",");
                return new Tuple3<>(Long.parseLong(fieldArray[0]), Integer.parseInt(fieldArray[1]), Integer.parseInt(fieldArray[2]));
            }
        });

        KeyedStream<Tuple3<Long, Integer, Integer>, Tuple> keyedStream = mapStream.keyBy(0, 1, 2);
        SingleOutputStreamOperator<Tuple4<Long, Integer, Integer, Integer>> sumProducts = keyedStream
            .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
            .apply(new CustomerPurchase.SumProduct());

        SingleOutputStreamOperator<Tuple4<Long, Integer, Integer, Integer>> alarm = sumProducts.filter(new FilterFunction<Tuple4<Long, Integer, Integer, Integer>>() {
            public boolean filter(Tuple4<Long, Integer, Integer, Integer> in) throws Exception {
                return in.f3 > 9;
            }
        });

        if (params.has("output")) {
            alarm.writeAsCSV(params.get("output"));
        }
        env.execute("CustomerPurchase");
    }

    public static class SumProduct implements WindowFunction<Tuple3<Long, Integer, Integer>, Tuple4<Long, Integer, Integer, Integer>, Tuple, TimeWindow> {
        public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple3<Long, Integer, Integer>> input, Collector<Tuple4<Long, Integer, Integer, Integer>> out) throws Exception {
            Iterator<Tuple3<Long, Integer, Integer>> iterator = input.iterator();
            Tuple3<Long, Integer, Integer> first = iterator.next();
            Long ts = 0L; Integer product = 0; Integer customer = 0; Integer quantity = 0;
            if (first != null) {
                ts = first.f0; product = first.f1; customer = first.f2; quantity = 1;
            }
            while (iterator.hasNext()) {
                iterator.next();
                quantity += 1;
            }
            out.collect(new Tuple4<>(ts, product, customer, quantity));
        }
    }
}

Calculating HFiles in HBase Region Servers

3. Consider an HBase Region Server hosting a table with two column families and two regions. How many HFiles are generated?

There are 4 HFiles in total. For each region and for each column family, an HFile stores the data. Since there are 2 regions and 2 column families, the calculation is: 2 regions × 2 column families = 4 HFiles.

HBase Table Creation and Data Distribution

The Customer table uses a composite key consisting of the fields C_W_ID, C_D_ID, and C_ID. It contains two column families: Personal_Data and Purchases_Data. Given a cluster with six computers, one of the computers is used to create the table and distribute the data as follows:

  • Create ‘Customer’, ‘Personal_Data’, ‘Purchases_Data’
  • Split ‘Customer’, ‘5,1,1’
  • Split ‘Customer’, ‘9,1,1’
  • Split ‘Customer’, ‘13,1,1’
  • Split ‘Customer’, ‘17,1,1’
  • Move ‘reg[-5,1,1)’, ‘rs1’
  • Move ‘reg[5,1,1-9,1,1)’, ‘rs2’
  • Move ‘reg[9,1,1-13,1,1)’, ‘rs3’
  • Move ‘reg[13,1,1-17,1,1)’, ‘rs4’
  • Move ‘reg[17,1,1-)’, ‘rs5’

Related entries: