Build a personal blog website

2023-01-24   ES  

Reprinted from:https://developer.aliyun.com/article/712704

Brief introduction:This article mainly shares the method of submission of FLINK’s 5 tasks. Mastering various tasks of submission methods will help improve our daily development and operation and maintenance efficiency.

Pass the -s parameter Start from the specified Savepoint:

flink-1.7.2 bin/flink run -d -s /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ./examples/streaming/TopSpeedWindowing.jar

1. Environment description

In the previous course, the establishment and application of the Flink development environment has been talked about the deployment and application of the Flink development environment. Today’s course is mainly about Flink’s client operation. This explanation is mainly actual operation. This course is based on communityFlink 1.7.2 version, the operating system is the Mac system, and the browser is Google Chrome browser. For preparation and deployment of the development environment, please refer to “The configuration, deployment and operation of the development and application of the development environment“content.

2. Course summary

As shown in the figure below, Flink provides rich client operations to submit tasks and interact with tasks, including the Flink command line, Scala Shell, SQL Client, RESTFUL API and Web. The most important thing provided by Flink is the command line, followed by SQL Client to submit the operation of the SQL task, and the task of SCALA Shell submitted Table API. At the same time, Flink also provides RESTFul service, and users can call by HTTP. In addition, web can submit tasks.

In the BIN directory of Flink installation directory, you can see Flink, Start-SCALA-shell.sh, SQL-Client.sh and other files. These are the entrances of the client operation.

3.flink client operation

3.1 Flink command line

Flink’s command line parameters, enter Flink -H to see the complete explanation:

  flink-1.7.2 bin/flink -h

If you want to see the parameters of a command, such as the run command, input:

  flink-1.7.2 bin/flink run -h

This article mainly explains some common operations. For more detailed documents, please refer to:Flink command line official documentation

3.1.1 Standalone

First start a standalone cluster:

  flink-1.7.2 bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host zkb-MBP.local.
Starting taskexecutor daemon on host zkb-MBP.local.

Openhttp://127.0.0.1:8081can see the web interface.

Run the task, take the example of FLINK’s own TopSpeedWindowing as an example:

  flink-1.7.2 bin/flink run -d examples/streaming/TopSpeedWindowing.jar
Starting execution of program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 5e20cb6b0f357591171dfcca2eea09de

Run the default and 1 concurrent default:

“Task Manager” on the left, and then click “Stdout” to see the output log:

or view the *.out file in the local log directory:

List

View task list:

  flink-1.7.2 bin/flink list -m 127.0.0.1:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
24.03.2019 10:14:06 : 5e20cb6b0f357591171dfcca2eea09de : CarTopSpeedWindowingExample (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

Stop

Stop the task. Specify the host address and port of the Jobmanager to be stopped by -m.

  flink-1.7.2 bin/flink stop -m 127.0.0.1:8081 d67420e52bd051fae2fddbaa79e046bb
Stopping job d67420e52bd051fae2fddbaa79e046bb.
------------------------------------------------------------
The program finished with the following exception:
  org.apache.flink.util.FlinkException: Could not stop the job   d67420e52bd051fae2fddbaa79e046bb.
  at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:554)
  at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
  at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:547)
  at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1062)
  at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
  at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.]
  at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
  at org.apache.flink.client.program.rest.RestClusterClient.stop(RestClusterClient.java:392)
  at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:552)
... 9 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.]
  at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:380)
  at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:364)
  at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
  at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
  at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

From the log, it can be seen that the Stop command executes failed. A job can be required by STOP to ask all Source can be stoppable, that is, the StopPableFunction interface is implemented.

/** 
  * The function that needs to be stoppable must implement this interface, such as the source of the streaming task. 
  * Stop () method calls when the task receives the Stop signal. 
  * Source must stop sending new data after receiving this signal. 
  */ 
 @Publicevolving 
 Public Interface StopPableFunction { 
     /** 
       * Stop Source. Unlike Cancel (), this is a request that allows Source to stop elegantly. 
      * The data in the waiting can continue to be sent, and no need to stop immediately. 
       */ 
     void stop (); 
 }

Cancel

Cancel the task. If state.savepoints.dir is configured in Conf/Flink-Conf.yaml, Savepoint will be saved, otherwise Savepoint will not be saved.

  flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 5e20cb6b0f357591171dfcca2eea09de
 
Cancelling job 5e20cb6b0f357591171dfcca2eea09de.
Cancelled job 5e20cb6b0f357591171dfcca2eea09de.

can also display the specified Savepoint directory when stopping.

  flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint 29da945b99dea6547c3fbafd57ed8759
 
Cancelling job 29da945b99dea6547c3fbafd57ed8759 with savepoint to /tmp/savepoint.
Cancelled job 29da945b99dea6547c3fbafd57ed8759. Savepoint stored in file:/tmp/savepoint/savepoint-29da94-88299bacafb7.
 
  flink-1.7.2 ll /tmp/savepoint/savepoint-29da94-88299bacafb7
total 32K
-rw-r--r-- 1 baoniu 29K Mar 24 10:33 _metadata

Cancellation and stop (flow operation) differences are as follows:

  • Cancel () calls, immediately call the Cancel () method of the operating operator to cancel them as soon as possible. If the operator does not stop after receiving the cancel () call, Flink will start the execution of the operator thread regularly until all the operator stops.
  • stop () call is a more elegant way to stop running flowing operations. Stop () is only applicable to Source to implement the operation of the StopPableFunction interface. When the user requests to stop the operation, all the source of the job will receive the Stop () method call. It is not until all sources are closed normally. This method enables all homework to work normally.

Savepoint

trigger Savepoint.

  flink-1.7.2 bin/flink savepoint -m 127.0.0.1:8081 ec53edcfaeb96b2a5dadbfbe5ff62bbb /tmp/savepoint
Triggering savepoint for job ec53edcfaeb96b2a5dadbfbe5ff62bbb.
Waiting for response...
Savepoint completed. Path: file:/tmp/savepoint/savepoint-ec53ed-84b00ce500ee
You can resume your program from this savepoint with the run command.

Description: The difference between Savepoint and Checkpoint (See the document for details):

  • Checkpoint is done incremental. The time is shorter and the amount of data is small. As long as it is enabled in the program, it will be automatically triggered. The user does not need to perceive it. Checkpoint is automatically used when the operation Failover does not need to be specified by the user.
  • Savepoint is made in full amount. The time is long and the data volume is large. It is necessary to actively trigger users. Savepoint is generally used for program updates (See the document for details), bug repair, A/B test and other scenarios, users need to designate.

Through -s parameters start with the specified Savepoint:

  flink-1.7.2 bin/flink run -d -s /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ./examples/streaming/TopSpeedWindowing.jar
Starting execution of program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.

flink-1.7.2 bin/flink run -d -s  /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ./examples/streaming/TopSpeedWindowing.jar

View the log of the jobmanager, you can see the log like this:

2019-03-28 10:30:53,957 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     
- Starting job 790d7b98db6f6af55d04aec1d773852d from savepoint /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ()
2019-03-28 10:30:53,959 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
 - Reset the checkpoint ID of job 790d7b98db6f6af55d04aec1d773852d to 2.
2019-03-28 10:30:53,959 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     
- Restoring job 790d7b98db6f6af55d04aec1d773852d from latest valid checkpoint: Checkpoint 1 @ 0 for 790d7b98db6f6af55d04aec1d773852d.

Modify

Modify the task parallel.

In order to facilitate the demonstration, we modify the conf/flink-conf.yaml to change the number of task slots from the default 1 to 4, and configure the Savepoint directory. (Modify parameter subsequent -s specify the Savepoint path. The current version may have bugs, prompting that it cannot be recognized)

taskmanager.numberOfTaskSlots: 4
state.savepoints.dir: file:///tmp/savepoint

After modifying the parameters, you need to restart the cluster to take effect, and then start the task:

  flink-1.7.2 bin/stop-cluster.sh && bin/start-cluster.sh
Stopping taskexecutor daemon (pid: 53139) on host zkb-MBP.local.
Stopping standalonesession daemon (pid: 52723) on host zkb-MBP.local.
Starting cluster.
Starting standalonesession daemon on host zkb-MBP.local.
Starting taskexecutor daemon on host zkb-MBP.local.
 
  flink-1.7.2 bin/flink run -d examples/streaming/TopSpeedWindowing.jar
Starting execution of program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 7752ea7b0e7303c780de9d86a5ded3fa

From the page, you can see Task Slot into 4. At this time, the default concurrency of the task is 1.

Modify the concurrency to 4 and 3 through the Modify command in order. You can see that each Modify command will trigger Savepoint once.

  flink-1.7.2 bin/flink modify -p 4 7752ea7b0e7303c780de9d86a5ded3fa
Modify job 7752ea7b0e7303c780de9d86a5ded3fa.
Rescaled job 7752ea7b0e7303c780de9d86a5ded3fa. Its new parallelism is 4.
 
  flink-1.7.2 ll /tmp/savepoint
total 0
drwxr-xr-x 3 baoniu 96 Jun 17 09:05 savepoint-7752ea-00c05b015836/
 
  flink-1.7.2 bin/flink modify -p 3 7752ea7b0e7303c780de9d86a5ded3fa
Modify job 7752ea7b0e7303c780de9d86a5ded3fa.
Rescaled job 7752ea7b0e7303c780de9d86a5ded3fa. Its new parallelism is 3.
 
  flink-1.7.2 ll /tmp/savepoint
total 0
drwxr-xr-x 3 baoniu 96 Jun 17 09:08 savepoint-7752ea-449b131b2bd4/

View the log of jobmanager, you can see:

The

2019-06-17 09:05:11,179 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 7752ea7b0e7303c780de9d86a5ded3fa from savepoint file:/tmp/savepoint/savepoint-790d7b-3581698f007e ()
2019-06-17 09:05:11,182 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Reset the checkpoint ID of job 7752ea7b0e7303c780de9d86a5ded3fa to 3.
2019-06-17 09:05:11,182 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring job 790d7b98db6f6af55d04aec1d773852d from latest valid checkpoint: Checkpoint 2 @ 0 for 7752ea7b0e7303c780de9d86a5ded3fa.
2019-06-17 09:05:11,184 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - No master state to restore
2019-06-17 09:05:11,184 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job CarTopSpeedWindowingExample (7752ea7b0e7303c780de9d86a5ded3fa) switched from state RUNNING to SUSPENDING.
org.apache.flink.util.FlinkException: Job is being rescaled.

Info

Info command is to view the Flink task execution plan (StreamGraph).

  flink-1.7.2 bin/flink info examples/streaming/TopSpeedWindowing.jar
----------------------- Execution Plan -----------------------
{"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Timestamps/Watermarks","pact":"Operator","contents":"Timestamps/Watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]}
--------------------------------------------------------------

Copy output JSON content, paste to this website:http://flink.apache.org/visualizer/

Comparison with the actual physical execution plan:

source

Related Posts

SKYLINE Implementing the effect of rubber band effects drawing rectangular box

IOS OpenGL ES wave special effect

On the question of the AJAX callback function return value judgment

WeChat retrieved code scanning

Build a personal blog website

Random Posts

Computer network (9) -Py physical layer (supplement)-Fourier transformation-channel reuse

C ++ The difference between the position order of the position of the global variable and the local variable storage

.NET Guide data to Excel (asp.net and winform program)

linux often uses GDB shortcuts to complete

[OpenCV] Mindvision camera GetImage