Skip to main content

Project Configuration

Configuration is very important in StreamPark.

Why do I need to configure

It takes about 4 steps to create a DataStream program.

  • StreamexEcutionEnvironment initial and configured
  • Create source
  • Create transformation
  • Create sink

When developing 'datastream' programs, we need to initialize 'environment' and configure relevant parameters. Generally, we should initialize 'environment' and configure relevant parameters in the first step. The configuration parameters include the following categories:

  • Parallelism
  • TimeCharacteristic
  • checkpoint
  • Watermark
  • State Backend
  • Restart Strategy
  • Other...

The above configurations are basically general, which is a repetitive work to be done in the first step.

Submit the program as follows:

flink run -m yarn-cluster -p 1 -c com.xx.Main job.jar

You need to set a series of environment parameters when developing the Flink Sql program. In addition, the following is an example of using pure SQL to develop the program.


import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class JavaTableApp {

public static void main(String[] args) {
EnvironmentSettings bbSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.build();

TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings);

String sourceDDL = "CREATE TABLE datagen ( " +
" f_random INT, " +
" f_random_str STRING, " +
" ts AS localtimestamp, " +
" WATERMARK FOR ts AS ts " +
") WITH ( " +
" 'connector' = 'datagen', " +
" 'rows-per-second'='10', " +
" 'fields.f_random.min'='1', " +
" 'fields.f_random.max'='5', " +
" 'fields.f_random_str.length'='10' " +
")";

bsTableEnv.executeSql(sourceDDL);

String sinkDDL = "CREATE TABLE print_table (" +
" f_random int," +
" c_val bigint, " +
" wStart TIMESTAMP(3) " +
") WITH ('connector' = 'print') ";

bsTableEnv.executeSql(sinkDDL);
}

}

In addition to setting the 'environmentsettings' parameter, most of the remaining code is spliced with SQL in Java. If the business is very complex, it will be difficult to maintain.

A simpler method should be used, such as simplifying some environment initialization parameters and startup parameters in the 'datastream' and 'Flink SQL' tasks. For the 'Flink SQL' job, it is better not to write a single line of code, nor write large pieces of SQL in the code. Can it be solved in a more elegant way?

Absolutely

StreamPark proposes the concept of unified program configuration, which is generated by configuring a series of parameters from development to deployment in the application.ymlaccording to a specific format a general configuration template, so that the initialization of the environment can be completed by transferring the configuration of the project to the program when the program is started. This is the concept of configuration file.

StreamPark provides a higher level of abstraction for the Flink SQL, developers only need to define SQL to sql.yaml, when the program is started, the sql.yaml is transferred to the main program, and the SQL will be automatically loaded and executed. This is the concept of sql file.

Terms

In order to better understand and communicate with each other, we configure a series of parameters of the program from development to deployment into a file according to a specific format. This file with a specific role is the configuration file of the project.

The SQL extracted in Flink SQL task is put into sql.yaml, this file with specific role is the sql file of the project.

Configuration file

In StreamPark, the configuration file of DataStream job and Flink Sql are common. In other words, this configuration file can define the configurations of DataStream and Flink Sql (the configuration file in Flink SQL job is optional). The format of the configuration file must be yaml and must meet the requirements of yaml.

How to configure this configuration file and what to pay attention to.


flink:
deployment:
option:
target: application
detached:
shutdownOnAttachedExit:
jobmanager:
property: #@see: https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
$internal.application.main: org.apache.streampark.flink.quickstart.QuickStartApp
pipeline.name: StreamPark QuickStart App
yarn.application.queue:
taskmanager.numberOfTaskSlots: 1
parallelism.default: 2
jobmanager.memory:
flink.size:
heap.size:
jvm-metaspace.size:
jvm-overhead.max:
off-heap.size:
process.size:
taskmanager.memory:
flink.size:
framework.heap.size:
framework.off-heap.size:
managed.size:
process.size:
task.heap.size:
task.off-heap.size:
jvm-metaspace.size:
jvm-overhead.max:
jvm-overhead.min:
managed.fraction: 0.4
pipeline:
auto-watermark-interval: 200ms
# checkpoint
execution:
checkpointing:
mode: EXACTLY_ONCE
interval: 30s
timeout: 10min
unaligned: false
externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# state backend
state:
backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
backend.incremental: true
checkpoint-storage: filesystem
savepoints.dir: file:///tmp/chkdir
checkpoints.dir: file:///tmp/chkdir
# restart strategy
restart-strategy: fixed-delay # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
restart-strategy.fixed-delay:
attempts: 3
delay: 5000
restart-strategy.failure-rate:
max-failures-per-interval:
failure-rate-interval:
delay:
# table
table:
planner: blink # (blink|old|any)
mode: streaming #(batch|streaming)

The above is the complete configuration related to the environment that needs to be paid attention to. These configurations are carried out under the namespace of Flink, mainly including two categories.

  • The configuration under deployment is the configuration related to the project deployment (that is, the configuration parameters related to a series of resources when the application is started).
  • Others are the configuration of the environment that needs attention during development.

There are some configurations related to the environment that need to be paid attention to during development.

  • checkpoint
  • watermark
  • state backend
  • restart-strategy
  • table

Deployment

Deployment related parameters and configuration items are included in deployment, including two types:

  • option
  • property

option

The parameters of Flink run are configured under option. Currently, the supported parameters are as follows.

Short ParamFull Param(prefix"--")EffectiveValue & TypeDescription
-ttarget yarn-per-job | application Deployment mode(only support yarn-per-job,application)
-ddetachedtrue | falserun as detached mode
-nallowNonRestoredStatetrue | falseallow to skip savepoint state that cannot be restored
-saeshutdownOnAttachedExittrue | falseIf the job is submitted in attached, when job cancel close cluster
-mjobmanageryarn-cluster | addressAddress of the JobManager to which to connect
-pparallelismintProgram parallelism
-cclassStringClass with the program entry point ("main()" method)

parallelism (-p) Parallelism does not support configuration in option, you can configure it in property class (-c) The main class of the program does not support configuration in option, you can configure it in property

Attention

The parameter in option must be a full parameter name

property

The parameter under property is the parameter under standard parameter - D, including two parts

  • Basic parameters
  • Memory parameters
Basic parameters

There are many basic parameters. The five most basic parameters are as follows.

KeyDescriptionRequired
$internal.application.mainClass with the program entry point ("main()" method)
pipeline.nameJob name
yarn.application.queueYARN queue
taskmanager.numberOfTaskSlotsTaskmanager slot number
parallelism.defaultProgram parallelism
Attention

$internal.application.main and pipeline.name must be set.

If you need to set more parameters, please refer to here, These parameters must be placed under the property and the parameter names must be correct. StreamPark will automatically resolve these parameters and take effect.

Memory parameters

Memory has many configuration parameters. The common configurations are as follows.

KeyDescription
jobmanager.memory.heap.sizeJVM Heap Memory size for JobManager. The minimum recommended JVM Heap size is 128.000mb (134217728 bytes).
jobmanager.memory.off-heap.sizeOff-heap Memory size for JobManager. This option covers all off-heap memory usage including direct and native memory allocation. The JVM direct memory limit of the JobManager process (-XX:MaxDirectMemorySize) will be set to this value if the limit is enabled by jobmanager.memory.enable-jvm-direct-memory-limit
jobmanager.memory.jvm-metaspace.sizeJVM Metaspace Size for the JobManager.
jobmanager.memory.jvm-overhead.minMin JVM Overhead size for the JobManager. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less or greater than the configured min or max size, the min or max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min and max size to the same value.
jobmanager.memory.jvm-overhead.maxMax JVM Overhead size for the JobManager. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less or greater than the configured min or max size, the min or max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min and max size to the same value.
jobmanager.memory.jvm-overhead.fractionFraction of Total Process Memory to be reserved for JVM Overhead. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less or greater than the configured min or max size, the min or max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min and max size to the same value.
taskmanager.memory.framework.heap.sizeFramework Heap Memory size for TaskExecutors. This is the size of JVM heap memory reserved for TaskExecutor framework, which will not be allocated to task slots.
taskmanager.memory.task.heap.sizeTask Heap Memory size for TaskExecutors. This is the size of JVM heap memory reserved for tasks. If not specified, it will be derived as Total Flink Memory minus Framework Heap Memory, Framework Off-Heap Memory, Task Off-Heap Memory, Managed Memory and Network Memory.
taskmanager.memory.managed.sizeManaged Memory size for TaskExecutors. This is the size of off-heap memory managed by the memory manager, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend. Memory consumers can either allocate memory from the memory manager in the form of MemorySegments, or reserve bytes from the memory manager and keep their memory usage within that boundary. If unspecified, it will be derived to make up the configured fraction of the Total Flink Memory.
taskmanager.memory.managed.fractionFraction of Total Flink Memory to be used as Managed Memory, if Managed Memory size is not explicitly specified.
taskmanager.memory.framework.off-heap.sizeFramework Off-Heap Memory size for TaskExecutors. This is the size of off-heap memory (JVM direct memory and native memory) reserved for TaskExecutor framework, which will not be allocated to task slots. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter.
taskmanager.memory.task.off-heap.sizeTask Off-Heap Memory size for TaskExecutors. This is the size of off heap memory (JVM direct memory and native memory) reserved for tasks. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter.
taskmanager.memory.jvm-metaspace.sizeJVM Metaspace Size for the TaskExecutors.

Similarly, if you want to configure more memory parameters, please refer to here. You need to put the memory configuration of Flink process memory, jobmanager and taskmanager in the property to ensure that it takes effect.

Configure Total Memory

The total process memory of Flink JVM processes consists of memory consumed by the Flink application (total Flink memory) and by the JVM to run the process. The total Flink memory consumption includes usage of JVM Heap and Off-heap (Direct or Native) memory.

The simplest way to set up memory in Flink is to configure either of the two following options:

ItemTaskManager ConfigJobManager Config
Flink total memorytaskmanager.memory.flink.sizejobmanager.memory.flink.size
Flink process total memorytaskmanager.memory.process.sizejobmanager.memory.process.size
Attention

Explicitly configuring both total process memory and total Flink memory is not recommended. It may lead to deployment failures due to potential memory configuration conflicts. Configuring other memory components also requires caution as it can produce further configuration conflicts.

Checkpoint

The configuration of checkpoint is simple. You can configure them as follows:

ItemDescriptionValue | Type
execution.checkpointing.intervalInterval period of checkpointDuration
execution.checkpointing.timeouttimeoutDuration
execution.checkpointing.modesemantics EXACTLY_ONCE | AT_LEAST_ONCE
execution.checkpointing.unalignedunalignedtrue | false

Watermark

For watermark configuration, you only need to set the generation cycle pipeline.auto-watermark-interval of the watermark.

State

state:
backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
backend.incremental: true
checkpoint-storage: filesystem
savepoints.dir: file:///tmp/chkdir
checkpoints.dir: file:///tmp/chkdir

There are roughly two types:

  • backend
  • checkpoints

backend

The backend is used to set the configuration of the state backend. The configuration of the state backend follows the configuration rules in the official website document. The following configurations are supported:

ItemDescriptionValue | TypeEffective rules
state.backendType of backend storagehashmap | rocksdb
state.checkpoint-storageThe checkpoint storage implementation to be used to checkpoint state.jobmanager | filesystem
state.backend.incrementalWhether to enable increment true | falserocksdb

If the save type of backend is rocksdb, you may need to further set the configuration of rocksdb. You can refer to the official website for configuration. It should be noted that the configuration of rocksdb on the official website is prefixed with state.backend, and the current namespace is under state.backend. Note that the parameter name must be correct

attention

The value item is a non-standard configuration. This item is used to set the state saving type (jobmanager | filesystem | rocksdb). Other items are standard configurations and comply with the specifications of the official website.

Restart Strategy

There are three restart strategies in Flink, corresponding to the three configurations here, as follows:

restart-strategy: fixed-delay  # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
restart-strategy.fixed-delay:
attempts: 3
delay: 5000
restart-strategy.failure-rate:
max-failures-per-interval:
failure-rate-interval:
delay:

Configure the specific restart strategy under restart-strategy

  • fixed-delay
  • failure-rate
  • none

fixed-delay

ItemDescriptionValue | Unit
attemptsNumber of Flink attempts to restart3
delaySpecify how long to restart after the task failsnone | s | m | min | h | d
Example:
attempts: 5
delay: 3 s

That is to say, the maximum number of failed retries of a task is 5, and the time interval for each task restart is 3 seconds. If the number of failed retries reaches 5, the task will fail and exit.

failure-rate

ItemDescriptionValue | Unit
max-failures-per-intervalMaximum number of restarts in given time interval before failing a job3
failure-rate-intervalTime interval for measuring failureNone | s | m | min | h | d
delayDelay between two consecutive restart attemptsNone | s | m | min | h | d
Example
 max-failures-per-interval: 10
failure-rate-interval: 5 min
delay: 2 s

That is, the time interval between each abnormal restart is 2 seconds. If the total number of failures reaches 10 within 5 minutes, the task fails.

None

There is no need to configure task parameters in case of no restart.

Unit suffix

Note that the time interval and frequency settings can be set without the unit suffix. If the unit suffix is not included, it will be treated as milliseconds by default. The optional units are:

  • s second
  • m minute
  • min minute
  • h hour
  • d day

Table

Under table is the configuration of Flink SQL. The currently supported configuration items and functions are as follows:

  • planner
  • mode
  • catalog
  • database
ItemDescriptionValue
plannerTable Plannerblink | old | any
modeTable Modestreaming | batch
catalogCatalog,Specifies that the will be used during initialization
databaseDatabase,Specifies that the will be used during initialization

Sql file

The SQL file must be in yaml format, and the definition rules of yaml file must be followed. The definition of specific internal SQL format is very simple, as follows:

sql: |
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='1000',
'fields.f_random.min'='1',
'fields.f_random.max'='1000',
'fields.f_random_str.length'='10'
);

CREATE TABLE print_table (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'print'
);

INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;

sql is the ID of the current SQL and must be unique. The following contents are specific SQL.

attention

In the above content, | after SQL is required. In addition, | will retain the format of the whole section. StreamPark can directly define multiple SQLs at once. Each SQLs must be separated by semicolons, and each section of SQLs must follow the format and specification specified by Flink SQL.

Summary

This chapter introduces the specific configuration of configuration files and SQL files in detail. I believe you have a preliminary impression and concept. Please refer to the following chapters for specific use.