Flink SQL flow computing visual UI platform

The Flink SQL flow computing visual UI platform independently developed by my friend for many years is really easy to use. It can realize the real MSP (mixed cloud scenario) multi data and multi reuse. The following is the instructions for the product to see if you have any use scenarios.

1, Introduction

The flink streaming platform web system is a visual web system based on the flink encapsulation. Users only need to configure sql in the web interface to complete the flow computing task,

The main functions include task configuration, start / stop task, alarm, log and other functions. The purpose is to reduce development and fully realize the task of Flink SQL flow calculation

Support local mode, yarn per mode and STANDALONE mode

Support udf, custom connectors, etc., fully compatible with official connectors

Currently, the flink version has been upgraded to 1.12

Source address https://github.com/zhp8341/flink-streaming-platform-web

Renderings

The demo/ document of this article is not updated in time. Please move to github https://github.com/zhp8341/flink-streaming-platform-web

2, Environment and installation

1. Environment

Operating system: linux

hadoop version 2+

Flink version 1.11.1 official address: https://ci.apache.org/projects/flink/flink-docs-release-1.11/

jdk version jdk1.8

scala version 2.11

kafka version 1.0+

mysql version 5.6+

2. Application installation

1. flink client installation

Download the corresponding version

https://archive.apache.org/dist/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.11.tgz Then unzip

a: /flink-1.11.1/conf

1. YARN_PER mode

Put the hadoop client configuration file under the file

core-site.xml

yarn-site.xml

hdfs-site.xml

2. LOCAL mode

nothing

3. STANDALONE mode

nothing

For the above three modes, you need to modify flink-conf.yaml to enable classloader Resolve order and set classloader Resolve order: parent first

b: /flink-1.11.1/lib hadoop integration

download flink-shaded-hadoop-2-uber-${xxx}.jar reach lib 
address  https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
copy

Execute export Hadoop after completion_ Classpath=`hadoop classpath`

2. Flick streaming platform web installation

Technology selection springboot2.2.8 Release

a: Download the latest version and unzip it https://github.com/zhp8341/flink-streaming-platform-web/releases/

 tar -xvf   flink-streaming-platform-web.tar.gz
copy

b: Execute mysql statement

mysql version 5.6+ and above create database database name: Flink_ Web execution TABLE statement statement address https://github.com/zhp8341/flink-streaming-platform-web/blob/master/docs/sql/flink_web.sql The

c: Modify database connection configuration

/flink-streaming-platform-web/conf/application.properties It was built above mysql address
copy

d: Start web

cd  /XXXX/flink-streaming-platform-web/binstart-up : sh deploy.sh  start cease :  sh deploy.sh  stopLog directory address:/XXXX/flink-streaming-platform-web/logs/
copy

e: Login

http://${ip or hostname}:9084/ for example: http://hadoop003:9084/ Login No.: admin password 123456
copy

f: Cluster

If the cluster deployment mode is required, refer to the figure below

3, Function introduction

1. New task configuration description

a: Task name (* required)

The task name cannot exceed 50 characters and can only contain numbers, letters and underscores 1

b: Operating mode

Yarn_ Per (yarn independent mode https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn)

STANDALONE (independent cluster https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/cluster_setup.html )The

Local (local cluster https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/local.html )The

LOCAL needs to start the flink service on a LOCAL stand-alone computer/ Bin/start-cluster Sh

c: Flick run configuration

1. YARN_PER mode

Parameter (consistent with the official), but only -p -yjm -yn -ytm -ys -yqu (required) -ys slot number is supported- Number of YN task managers- Heap memory size of yjm job manager- The heap memory size of the ytm task manager- yqu yarn queue ming-p parallelism is detailed in the official document, such as: -yqu Flink -yjm 1024m -ytm 2048m -p 1 -ys 1

2. LOCAL mode

No configuration required 1

3. STANDALONE mode

-d,--detached                        If present, runs the job in detached                                          mode-p,--parallelism <parallelism>       The parallelism with which to run the                                          program. Optional flag to override the                                          default value specified in the                                          configuration.-s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job                                          from (for example                                          hdfs:///flink/savepoint-1537) Other operating parameters can be viewed through flick -h
copy

d: Checkpoint information

If it is not filled in, it will not be enabled by default checkpoint Mechanism parameters only support-checkpointInterval-checkpointingMode-checkpointTimeout-checkpointDir-tolerableCheckpointFailureNumber-asynchronousSnapshots For example:-asynchronousSnapshots true  -checkpointDir   hdfs://Hccluster/flink/checkpoints/ (note the current permissions)
copy

Parameter value description

e: udf address

UDF addresses only support HTTP and fill in an address such as: http://xxx.xxx.com/flink-streaming-udf.jar After the address is filled in, you can directly write create function jsonhaskey as ascom in the sql statement Yt Udf Jsonhaskeyudf; The

See UDF development demo for details https://github.com/zhp8341/flink-streaming-udf

2. System settings

There are three required options for system settings

1. Flink streaming platform web application installation directory (required). This is the application installation directory, such as /root/flink streaming platform web/ 2. Flink installation directory (required) -- Flink client directory, such as: /usr/local/flink-1.11.1/

3. rm Http address of yarn -- rm Http address of hadoop yarn http://hadoop003:8088/ The

4. Flink_ Rest_ Http_ Address local mode uses the address of flick http

5. Flink_ Rest_ HA_ Http_ If the address standalone mode supports HA, multiple addresses can be filled in; Separate with

3. Alarm settings

Alarm settings are used to alarm when the running task hangs up. Data: nail alarm settings official document: https://help.aliyun.com/knowledge_detail/106247.html The

Security setting keywords must be filled in: alarm

Renderings

3, Configure demo

demo1 single stream kafka write mysqld reference

demo2 dual stream kafka write mysql reference

demo3 kafka and mysql dimension table real-time Association writing mysql reference

demo4 scrolling window

demo5 sliding window

Create function UTC2Local AS 'com.streaming.flink.udf.UTC2Local';
Create table source_table (
f0 Integer,
f1 Integer,
f2 String,
Process time as process time ()
) And(
'connector' = 'Data generation',
'Rows per second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='1000',
'fields.f_random.min'='1',
'fields.f_random.max'='1000',
'fields.f_random_str.length'='10'
);
Create table print_table (
f0 Integer,
f1 Integer,
f2 String,
t2 Timestamp (6),
t1 time stamp(6)
) And(
'connector' = 'Print'
);
insert print_table choice f0,f1,f2, proctime as t2, UTC2Local(proctime) as t1 from source
copy
Create function jsonHasKey As com.xx.udf.JsonHasKeyUDF;

-- If used udf Function must be configured udf address


     Create table flink_test_6 (
  identification BIGINT,
  day_time VARCHAR,
  quantity BIGINT,
  Process time as process time ()
)
And(
'connector.properties.zookeeper.connect'='hadoop001:2181',
  'connector.version'='currency',
  'connector.topic'='flink_test_6',
  'connector.startup-mode'='earliest-offset',
  'format.derive-schema'='true',
  'connector.type'='kafka',
  'Update mode'='Append',
  'connector.properties.bootstrap.servers'='hadoop003:9092',
  'connector.properties.group.id'='flink_gp_test1',
  'format.type'='json'
);


Create table flink_test_6_dim (
  identification BIGINT,
  coupon_amnount BIGINT
)
And(
   'connector.type' = 'jdbc',
   'connector.url' = 'jdbc:mysql://127.0.0.1:3306/flink_web?characterEncoding=UTF-8',
   'connector.table' = 'test_dim',
   'connector.username' = 'flink_web_test',
   'connector.password' = 'flink_web_test_123',
   'connector.lookup.max-retries' = '3'
);


Create table sync_test_3 (
                   day_time String,
                   total_gmv Large integer
) And(
   'connector.type' = 'jdbc',
   'connector.url' = 'jdbc:mysql://127.0.0.1:3306/flink_web?characterEncoding=UTF-8',
   'connector.table' = 'sync_test_3',
   'connector.username' = 'flink_web_test',
   'connector.password' = 'flink_web_test_123'

);


Insert into sync_test_3
 choice
  During the day,
  SUM(amnount - coupon_amnount) As total_gmv
 from
  (
    choice
      a.day_time As day_time,
      a.amnount As amnount,
      b.coupon_amnount As coupon_amnount
    from
      flink_test_6 As
      Left link flink_test_6_dim FOR SYSTEM_TIME AS OF a.proctime as b
     ON b.id = a.id
  )
Group by day time;
copy

4, Support the official syntax of flink sql

Fully comply with the connector related configuration of flink1.11.1

See

http://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connect.html

5, Other

1. Because different hadoop cluster environments may lead to difficulties in deployment, the entire setup is time-consuming

2. Because the es and hbase versions are different, you may need to download the source code and reselect the source code address of the corresponding version https://github.com/zhp8341/flink-streaming-platform-web

6, Questions

1,

Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.

set up HADOOP_CONF_DIR=/etc/hadoop/conf Because there is no setting HADOOP_CONF_DIR. 



Cannot from JAR File builder.

Use help options(-h or --help)Get help on the command.


Solution
   export HADOOP_HOME=/etc/hadoop
   export HADOOP_CONF_DIR=/etc/hadoop/conf
   export HADOOP_CLASSPATH=`hadoop Classpath`

   source /etc/profile

  Best configuration variables
copy

2020-10-09 14:48:22,060 ERROR com.flink.streaming.core.JobApplication - Task execution failed:
java.lang.IllegalStateException: Cannot instantiate java compiler 
        stay org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:434)
        stay org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:375)
        stay org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)
        stay org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149)
        stay org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
        stay org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
        stay org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
        stay org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
        stay org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
        stay org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)
        stay org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875)
        stay org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:475)
        stay org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:488)
        stay org.apache.calcite.rel.metadata.RelMetadataQuery.revise(RelMetadataQuery.java:193)
        stay org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:797)
        stay org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:298)
        stay org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
        stay org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
        stay org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
        stay org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
        stay org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
        stay org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
        stay org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
        stay org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
        stay org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
        stay org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        stay org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        stay scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        stay scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        stay scala.collection.Iterator$class.foreach(Ite​​rator.scala:891)
        stay scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1334)
        stay scala.collection.IterableLike$class.foreach(Ite​​rableLike.scala:72)
        stay scala.collection.AbstractIterable.foreach(Ite​​rable.scala:54)
        stay scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        stay scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        stay org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        stay org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
        stay org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
        stay org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        stay org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
        stay org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
        stay org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
        stay org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
        stay com.flink.streaming.core.JobApplication.callDml(JobApplication.java:138)
        stay com.flink.streaming.core.JobApplication.main(JobApplication.java:85)
        stay sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        stay sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        stay sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        stay java.lang.reflect.Method.invoke(Method.java:498)
        stay org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
        stay org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
        stay org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
        stay org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
        stay org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
        stay org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
        stay org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
        stay java.security.AccessController.doPrivileged(Native method)
        stay javax.security.auth.Subject.doAs(Subject.java:422)
        stay org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
        stay org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        stay org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Cause: java.lang.ClassCastException: org.codehaus.janino.CompilerFactory Cannot convert to org.codehaus.commons.compiler.ICompilerFactory
        stay org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
        stay org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
        stay org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:432)
        ... 60 more
copy

Primary log directory

1. web system log

/{installation directory} / flick streaming platform Web / logs/

2. flink client command

FLINKHOME/log/flink−{FLINK_HOME}/log/flink-FLINKHOME/log/flink−{USER}-client-.log

7, RoadMap

1. Supports connectors other than official

2. Support Flink Session mode

8, Life

Present the morning scenery of Lugu Lake -- must go to a holy land

No matter how tired you work, you should also pay attention to your health. The times encourage the brave to combine work and rest as an ordinary life. We are all civilized people. We all know that overtime is inevitable. I hope everyone (the boss) is very tolerant of overtime [tears] [tears] [tears] [tears] when we were young, we wanted to travel and play. The reality created us all kinds of reasons. Brothers, dare to rush. At this moment, we have to go. At this moment, we have to go. Come on, Ollie

Posted by JetJagger on Tue, 31 May 2022 05:50:53 +0530