Skip to main content

Apache HBase Connector

Apache HBase is a highly reliable, high-performance, column-oriented, and scalable distributed storage system. Using HBase technology, large-scale structured storage clusters can be built on cheap PC Servers. Unlike general relational databases, HBase is a database suitable for unstructured data storage because HBase storage is based on a column rather than a row-based schema.

Flink does not officially provide a connector for Hbase DataStream. StreamPark encapsulates HBaseSource and HBaseSink based on Hbase-client. It supports automatic connection creation based on configuration and simplifies development. StreamPark reading Hbase can record the latest status of the read data when the checkpoint is enabled, and the offset corresponding to the source can be restored through the data itself. Implement source-side AT_LEAST_ONCE.

HbaseSource implements Flink Async I/O to improve streaming throughput. The sink side supports AT_LEAST_ONCE by default. EXACTLY_ONCE is supported when checkpointing is enabled.

hint

StreamPark reading HBASE can record the latest state of the read data when checkpoint is enabled. Whether the previous state can be restored after the job is resumed depends entirely on whether the data itself has an offset identifier, which needs to be manually specified in the code. The recovery logic needs to be specified in the func parameter of the getDataStream method of HBaseSource.

Dependency of HBase writing

HBase Maven Dependency

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>

Regular way to write and read Hbase

1.Create database and table

 create 'Student', {NAME => 'Stulnfo', VERSIONS => 3}, {NAME =>'Grades', BLOCKCACHE => true}

2.Write demo and Read demo


import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.List;


public class FlinkHBaseReader {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 添加数据源
DataStream<String> stream = env.addSource(new HBaseReader());
stream.print();
env.execute("FlinkHBaseDemo");
}
}


class HBaseReader extends RichSourceFunction<String> {
private Connection connection = null;
private ResultScanner rs = null;
private Table table = null;

@Override
public void open(Configuration parameters) throws Exception {
org.apache.hadoop.conf.Configuration hconf = HBaseConfiguration.create();
hconf.set("hbase.zookeeper.quorum", "localhost:2181");
hconf.set("zookeeper.property.clientPort", "/hbase");
connection = ConnectionFactory.createConnection(hconf);
}

@Override
public void run(SourceContext<String> sourceContext) throws Exception {
table = connection.getTable(TableName.valueOf("Student"));
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("Stulnfo"));
rs = table.getScanner(scan);
for (Result result : rs) {
StringBuilder sb = new StringBuilder();
List<Cell> cells = result.listCells();
for (Cell cell : cells) {
String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
sb.append(value).append("-");
}
String value = sb.replace(sb.length() - 1, sb.length(), "").toString();
sourceContext.collect(value);
}
}

@Override
public void cancel() {

}

@Override
public void close() throws Exception {
if (rs != null) {
rs.close();
}
if (table != null) {
table.close();
}
if (connection != null) {
connection.close();
}
}
}

Reading and writing HBase in this way is cumbersome and inconvenient. StreamPark follows the concept of convention over configuration and automatic configuration. Users only need to configure Hbase connection parameters and Flink operating parameters. StreamPark will automatically assemble source and sink, which greatly simplifies development logic and improves development efficiency and maintainability.

write and read Hbase with Apache StreamPark™

1. Configure policies and connection information

# hbase
hbase:
zookeeper.quorum: test1,test2,test6
zookeeper.property.clientPort: 2181
zookeeper.session.timeout: 1200000
rpc.timeout: 5000
client.pause: 20

2. Read and write HBase

Writing to Hbase with StreamPark is very simple, the code is as follows:


import org.apache.streampark.common.util.ConfigUtils
import org.apache.streampark.flink.core.java.wrapper.HBaseQuery
import org.apache.streampark.flink.core.scala.FlinkStreaming
import org.apache.streampark.flink.core.scala.request.HBaseRequest
import org.apache.streampark.flink.core.scala.source.HBaseSource
import org.apache.flink.api.scala.createTypeInformation
import org.apache.hadoop.hbase.CellUtil
import org.apache.hadoop.hbase.client.{Get, Scan}
import org.apache.hadoop.hbase.util.Bytes

import java.util

object HBaseSourceApp extends FlinkStreaming {

override def handle(): Unit = {

implicit val conf = ConfigUtils.getHBaseConfig(context.parameter.toMap)

val id = HBaseSource().getDataStream[String](query => {
new HBaseQuery("person", new Scan())
},
//The following methods determine the logic for restoring offsets from checkpoints
r => new String(r.getRow), null)
//flink Async I/O
HBaseRequest(id).requestOrdered(x => {
new HBaseQuery("person", new Get(x.getBytes()))
}, (a, r) => {
val map = new util.HashMap[String, String]()
val cellScanner = r.cellScanner()
while (cellScanner.advance()) {
val cell = cellScanner.current()
val q = Bytes.toString(CellUtil.cloneQualifier(cell))
val (name, v) = q.split("_") match {
case Array(_type, name) =>
_type match {
case "i" => name -> Bytes.toInt(CellUtil.cloneValue(cell))
case "s" => name -> Bytes.toString(CellUtil.cloneValue(cell))
case "d" => name -> Bytes.toDouble(CellUtil.cloneValue(cell))
case "f" => name -> Bytes.toFloat(CellUtil.cloneValue(cell))
}
case _ =>
}
map.put(name.toString, v.toString)
}
map.toString
}).print("Async")
}

}

When StreamPark writes to HBase, you need to create the method of HBaseQuery, specify the method to convert the query result into the required object, identify whether it is running, and pass in the running parameters. details as follows

/**
* @param ctx
* @param property
*/
class HBaseSource(@(transient@param) val ctx: StreamingContext, property: Properties = new Properties()) {

/**
* @param query Specify the method to create H Base Query
* @param func The query results are converted into the expected counterparty method
* @param running runID
* @param prop Job parameters
* @tparam R
* @return
*/
def getDataStream[R: TypeInformation](query: R => HBaseQuery,
func: Result => R,
running: Unit => Boolean)(implicit prop: Properties = new Properties()) = {
Utils.copyProperties(property, prop)
val hBaseFunc = new HBaseSourceFunction[R](prop, query, func, running)
ctx.addSource(hBaseFunc)
}

}

StreamPark HbaseSource implements flink Async I/O, which is used to improve the throughput of Streaming: first create a DataStream, then create an HBaseRequest and call requestOrdered() or requestUnordered() to create an asynchronous stream, as follows:

class HBaseRequest[T: TypeInformation](@(transient@param) private val stream: DataStream[T], property: Properties = new Properties()) {

/**
*
* @param queryFunc
* @param resultFunc
* @param timeout
* @param capacity
* @param prop
* @tparam R
* @return
*/
def requestOrdered[R: TypeInformation](queryFunc: T => HBaseQuery, resultFunc: (T, Result) => R, timeout: Long = 1000, capacity: Int = 10)(implicit prop: Properties): DataStream[R] = {
Utils.copyProperties(property, prop)
val async = new HBaseAsyncFunction[T, R](prop, queryFunc, resultFunc, capacity)
AsyncDataStream.orderedWait(stream, async, timeout, TimeUnit.MILLISECONDS, capacity)
}

/**
*
* @param queryFunc
* @param resultFunc
* @param timeout
* @param capacity
* @param prop
* @tparam R
* @return
*/
def requestUnordered[R: TypeInformation](queryFunc: T => HBaseQuery, resultFunc: (T, Result) => R, timeout: Long = 1000, capacity: Int = 10)(implicit prop: Properties): DataStream[R] = {
Utils.copyProperties(property, prop)
val async = new HBaseAsyncFunction[T, R](prop, queryFunc, resultFunc, capacity)
AsyncDataStream.unorderedWait(stream, async, timeout, TimeUnit.MILLISECONDS, capacity)
}

}

StreamPark supports two ways to write data: 1. addSink() 2. writeUsingOutputFormat Examples are as follows:

    //1)Insert way 1
HBaseSink().sink[TestEntity](source, "order")
//2) insert way 2
//1.Specify the HBase configuration file
implicit val prop = ConfigUtils.getHBaseConfig(context.parameter.toMap)
//
source.writeUsingOutputFormat(new HBaseOutputFormat[TestEntity]("order", entry2Put))

Other configuration

All other configurations must comply with the StreamPark configuration. For specific configurable items and the role of each parameter, please refer to the project configuration