百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Flink Docker Compose mysql binlog 同步到es

nanshan 2025-05-21 15:21 10 浏览 0 评论

系统ubuntu es版本7.18 flink:1.17.2

目录

mkdir -p /usr/project/flink/{conf,job,logs}
chmod -R 777 /usr/project/flink
#资源情况
mysql8.0 Elasticsearch7.18 购买的google云的

# 目录结构
/usr/project/flink/
/usr/project/flink/
├── conf/
│   ├── flink-conf.yaml
│   └── log4j2.xml
├── job/
│   ├── flink-connector-elasticsearch7-3.0.1-1.17.jar
│   ├── flink-connector-elasticsearch-base-3.0.1-1.17.jar
│   ├── flink-sql-connector-mysql-cdc-3.1.1.jar
│   └── win_user.sql
├── logs/
└── docker-compose.yml

docker-compose.yml

vim /usr/project/flink/docker-compose.yml

使用云es

services:
  jobmanager:
    image: flink:1.17.2
    restart: always
    container_name: flink-jobmanager
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    mem_limit: 4g
    cpu_shares: 1024
    volumes:
      - ./conf:/opt/flink/conf
      - ./job:/opt/flink/job
      - /usr/project/flink/logs:/opt/flink/log
    networks:
      - flink-network

  taskmanager:
    image: flink:1.17.2
    restart: always
    container_name: flink-taskmanager
    depends_on:
      - jobmanager
    command: >
      taskmanager
      -XX:MaxMetaspaceSize=512m
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    mem_limit: 8g
    cpu_shares: 1024
    volumes:
      - ./conf:/opt/flink/conf
      - ./job:/opt/flink/job
      - /usr/project/flink/logs:/opt/flink/log
    networks:
      - flink-network

volumes:
  es_data:

networks:
  flink-network:
    driver: bridge

本地创建es kibana

version: '3.8'

services:
  jobmanager:
    image: flink:1.17.2
    container_name: flink-jobmanager
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    volumes:
      - ./conf:/opt/flink/conf
      - ./job:/opt/flink/job
      - /usr/project/flink/logs:/opt/flink/log

  taskmanager:
    image: flink:1.17.2
    container_name: flink-taskmanager
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    volumes:
      - ./conf:/opt/flink/conf
      - ./job:/opt/flink/job
      - /usr/project/flink/logs:/opt/flink/log

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.10.2
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - ELASTIC_PASSWORD=123456
    ports:
      - "9200:9200"
      - "9300:9300"
    volumes:
      - es_data:/usr/share/elasticsearch/data
    networks:
      - flink-network

  kibana:
    image: docker.elastic.co/kibana/kibana:7.10.2
    container_name: kibana
    environment:
      - ELASTICSEARCH_URL=http://elasticsearch:9200
      - ELASTICSEARCH_USERNAME=elastic
      - ELASTICSEARCH_PASSWORD=123456
    ports:
      - "5601:5601"
    networks:
      - flink-network

volumes:
  es_data:

networks:
  flink-network:
    driver: bridge

es验证

curl -u elastic:123456 http://localhost:9200

Flink SQL Job 示例

文件
/usr/project/flink/job/win_user.sql

存量增量模式

scan.startup.mode 设置为 'initial',以从表的初始状态开始读取数据,然后再进行增量同步

将其设置为 'latest-offset',以从最新的偏移量开始读取数据,实现增量同步

验证表是否成功创建

/opt/flink/bin/sql-client.sh embedded

SHOW TABLES;
SELECT * FROM source_win_user LIMIT 10;
#验证表是否成功创建 进入flink sql
/opt/flink/bin/sql-client.sh embedded

SHOW TABLES;
SELECT * FROM source_win_user LIMIT 10;

#验证表是否成功创建 进入flink sql
/opt/flink/bin/sql-client.sh embedded

SHOW TABLES;
SELECT * FROM source_win_user LIMIT 10;

配置模块

vim /usr/project/flink/job/win_user.sql

CREATE TABLE source_win_user (
  id INT,
  username STRING,
  merchant_id INT,
  avatar STRING,
  fcoin DECIMAL(15,4),
  coin_commission DECIMAL(15,4),
  level_id TINYINT,
  role TINYINT,
  is_promoter TINYINT,
  flag INT,
  real_name STRING,
  signature STRING,
  birthday STRING,
  area_code STRING,
  mobile STRING,
  email STRING,
  sex TINYINT,
  bind_bank TINYINT,
  address STRING,
  score INT,
  promo_code STRING,
  id_path STRING,
  sup_uid_1 INT,
  sup_username_1 STRING,
  sup_uid_2 INT,
  sup_uid_3 INT,
  sup_uid_4 INT,
  sup_uid_5 INT,
  sup_uid_6 INT,
  sup_uid_top INT,
  sup_username_top STRING,
  sup_level_top INT,
  password_hash STRING,
  password_coin STRING,
  ip STRING,
  third_login_type STRING,
  ip_region STRING,
  status TINYINT,
  last_login_ip STRING,
  last_login_ip_region STRING,
  last_login_time INT,
  last_login_device_id STRING,
  created_at INT,
  updated_at INT,
  freeze_cause STRING,
  freeze_at INT,
  operator_name STRING,
  fb_pid STRING,
  fb_cid STRING,
  created_name STRING,
  memberType TINYINT,
  google_sub_id STRING,
  facebook_sub_id STRING,
  secret STRING,
  code_url STRING,
  code_status TINYINT,
  user_type TINYINT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '127.0.0.1',
  'port' = '3306',
  'username' = 'main',
  'password' = '123456',
  'database-name' = 'main',
  'table-name' = 'win_user',
  'scan.startup.mode' = 'initial',          -- 读取存量数据
  'debezium.snapshot.mode' = 'never',       -- 使用快照模式initial  增量模式never 增量模式
  'scan.incremental.snapshot.enabled' = 'true'  -- 启用增量同步
);


CREATE TABLE es_sink_table_win_user (
  id INT,
  username STRING,
  merchant_id INT,
  avatar STRING,
  fcoin DECIMAL(15,4),
  coin_commission DECIMAL(15,4),
  level_id TINYINT,
  role TINYINT,
  is_promoter TINYINT,
  flag INT,
  real_name STRING,
  signature STRING,
  birthday STRING,
  area_code STRING,
  mobile STRING,
  email STRING,
  sex TINYINT,
  bind_bank TINYINT,
  address STRING,
  score INT,
  promo_code STRING,
  id_path STRING,
  sup_uid_1 INT,
  sup_username_1 STRING,
  sup_uid_2 INT,
  sup_uid_3 INT,
  sup_uid_4 INT,
  sup_uid_5 INT,
  sup_uid_6 INT,
  sup_uid_top INT,
  sup_username_top STRING,
  sup_level_top INT,
  password_hash STRING,
  password_coin STRING,
  ip STRING,
  third_login_type STRING,
  ip_region STRING,
  status TINYINT,
  last_login_ip STRING,
  last_login_ip_region STRING,
  last_login_time INT,
  last_login_device_id STRING,
  created_at INT,
  updated_at INT,
  freeze_cause STRING,
  freeze_at INT,
  operator_name STRING,
  fb_pid STRING,
  fb_cid STRING,
  created_name STRING,
  memberType TINYINT,
  google_sub_id STRING,
  facebook_sub_id STRING,
  secret STRING,
  code_url STRING,
  code_status TINYINT,
  user_type TINYINT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'https://127.0.0.1:9243',
  'username' = 'elastic',
  'password' = '123456',
  'index' = 'win_user',  -- 确保索引名称与 Elasticsearch 中的匹配
  'sink.bulk-flush.interval' = '1s',
  'sink.bulk-flush.backoff.max-retries' = '3',   -- 设置最大重试次数
  'sink.bulk-flush.max-actions' = '100', -- 一条数据也会同步不等待
  'sink.bulk-flush.max-size' = '1mb', -- 达到 1MB 或 200 条数据时批量 flush
  'sink.bulk-flush.backoff.delay' = '100ms',       -- 设置重试的延迟
  'sink.bulk-flush.backoff.strategy' = 'constant'  -- 重试策略
);

-- 3. 执行数据插入任务
INSERT INTO es_sink_table_win_user
SELECT * FROM source_win_user;

验证

/opt/flink/bin/sql-client.sh embedded
#验证
SHOW TABLES;
desc es_sink_table_win_user;
DROP TABLE IF EXISTS es_sink_table_win_user;
DROP TABLE IF EXISTS source_win_user;

# Flink 1.17 中,您可以使用以下命令查看已注册的连接器
SHOW TABLES;
#作业状态
SHOW JOBS;
#详情
EXPLAIN SELECT * FROM source_win_user;

SELECT * FROM source_win_user LIMIT 10;

优化配置 必须要配置

/opt/flink/bin/sql-client.sh embedded
#增加的 Session 全局配置(SET)
SET execution.checkpointing.interval = '1s';
SET restart-strategy = 'fixed-delay';
SET restart-strategy.fixed-delay.attempts = '3';
SET restart-strategy.fixed-delay.delay = '5s';
SET parallelism.default = 4;
SET state.backend = 'rocksdb';
SET state.backend.rocksdb.memory.managed = 'true';
SET execution.parallelism = 8;

#-- 提交作业时设置  Sink 的并行度提升
SET parallelism.default = 2; 
#最高作业任务
SET execution.parallelism = 8;
#查看验证配置
SET;

Flink 日志配置

mkdir -p /usr/project/flink/conf

vim /usr/project/flink/conf/flink-conf.yaml

任务曹 就是比如你需要创建10个作业 必须设置大于10

taskmanager.numberOfTaskSlots: 10

jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 10
jobmanager.memory.process.size: 1024m
taskmanager.memory.process.size: 2048m
rest.port: 8081

state.backend: rocksdb
state.backend.rocksdb.memory.managed: true
execution.checkpointing.interval: 5000ms
execution.checkpointing.tolerable-failed-checkpoints: 1
execution.checkpointing.externalized-checkpoints: RETAIN_ON_CANCELLATION

日志

vim /usr/project/flink/conf/log4j2.xml

<Configuration status="warn">
  <Appenders>
    <RollingFile name="RollingFile" fileName="/opt/flink/logs/flink.log"
                 filePattern="/opt/flink/logs/${date:yyyy-MM}/flink-%d{yyyy-MM-dd}.log">
      <PatternLayout>
        <Pattern>%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n</Pattern>
      </PatternLayout>
      <Policies>
        <SizeBasedTriggeringPolicy size="100MB"/>
      </Policies>
    </RollingFile>
  </Appenders>

  <Loggers>
    <Root level="info">
      <AppenderRef ref="RollingFile"/>
    </Root>
  </Loggers>
</Configuration>


连接器下载配置


flink-connector-elasticsearch包官方下载地址
https://repo1.maven.org/maven2/org/apache/flink/ 要选对版本 es7.17

flink-1.17.2

cd /usr/project/flink/job

#删除当前目录除win_user.sql其他的文件
find . -maxdepth 1 ! -name 'win_user.sql' ! -name '.' -type f -exec rm -f {} +

# MySQL CDC
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.1.1/flink-sql-connector-mysql-cdc-3.1.1.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.1/flink-sql-connector-mysql-cdc-2.4.1.jar
# Elasticsearch
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch-base/3.0.1-1.17/flink-connector-elasticsearch-base-3.0.1-1.17.jar

# 补充依赖
wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.13/httpclient-4.5.13.jar
wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.13/httpcore-4.4.13.jar
wget https://repo1.maven.org/maven2/commons-logging/commons-logging/1.2/commons-logging-1.2.jar
wget https://repo1.maven.org/maven2/commons-codec/commons-codec/1.11/commons-codec-1.11.jar

启动并运行作业

Es 先创建对应的index 这一步不需要 测试的时候可以用下

#先创建对应的index
curl -X PUT "https://127.0.0.1:9243/win_user" \
  -u "elastic:123456" \
  -H 'Content-Type: application/json' \
  -d '{
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 1
    },
    "mappings": {
      "properties": {
        "id": { "type": "integer" },
        "username": { "type": "keyword" },
        "merchant_id": { "type": "integer" },
        "avatar": { "type": "keyword" },
        "fcoin": { "type": "double" },
        "coin_commission": { "type": "double" },
        "level_id": { "type": "byte" },
        "role": { "type": "byte" },
        "is_promoter": { "type": "byte" },
        "flag": { "type": "integer" },
        "real_name": { "type": "keyword" },
        "signature": { "type": "text" },
        "birthday": { "type": "keyword" },
        "area_code": { "type": "keyword" },
        "mobile": { "type": "keyword" },
        "email": { "type": "keyword" },
        "sex": { "type": "byte" },
        "bind_bank": { "type": "byte" },
        "address": { "type": "text" },
        "score": { "type": "integer" },
        "promo_code": { "type": "keyword" },
        "id_path": { "type": "keyword" },
        "sup_uid_1": { "type": "integer" },
        "sup_username_1": { "type": "keyword" },
        "sup_uid_2": { "type": "integer" },
        "sup_uid_3": { "type": "integer" },
        "sup_uid_4": { "type": "integer" },
        "sup_uid_5": { "type": "integer" },
        "sup_uid_6": { "type": "integer" },
        "sup_uid_top": { "type": "integer" },
        "sup_username_top": { "type": "keyword" },
        "sup_level_top": { "type": "integer" },
        "password_hash": { "type": "keyword" },
        "password_coin": { "type": "keyword" },
        "ip": { "type": "keyword" },
        "third_login_type": { "type": "keyword" },
        "ip_region": { "type": "keyword" },
        "status": { "type": "byte" },
        "last_login_ip": { "type": "keyword" },
        "last_login_ip_region": { "type": "keyword" },
        "last_login_time": { "type": "integer" },
        "last_login_device_id": { "type": "keyword" },
        "created_at": { "type": "integer" },
        "updated_at": { "type": "integer" },
        "freeze_cause": { "type": "text" },
        "freeze_at": { "type": "integer" },
        "operator_name": { "type": "keyword" },
        "fb_pid": { "type": "keyword" },
        "fb_cid": { "type": "keyword" },
        "created_name": { "type": "keyword" },
        "memberType": { "type": "byte" },
        "google_sub_id": { "type": "keyword" },
        "facebook_sub_id": { "type": "keyword" },
        "secret": { "type": "keyword" },
        "code_url": { "type": "keyword" },
        "code_status": { "type": "byte" },
        "user_type": { "type": "byte" }
      }
    }
  }'

插入测试数据到es

curl -X POST "https://127.0.0.1:9243/win_user/_doc/" \
  -u "elastic:123456" \
  -H 'Content-Type: application/json' \
  -d '{
    "id": 12,
    "username": "user1",
    "merchant_id": 123,
    "avatar": "avatar1",
    "fcoin": 100.5,
    "coin_commission": 5.2,
    "level_id": 1,
    "role": 2,
    "is_promoter": 1,
    "flag": 0,
    "real_name": "User One",
    "signature": "Sample signature",
    "birthday": "1990-01-01",
    "area_code": "12345",
    "mobile": "1234567890",
    "email": "user1@example.com",
    "sex": 1,
    "bind_bank": 1,
    "address": "123 Street Name",
    "score": 1000,
    "promo_code": "PROMO124",
    "id_path": "1/2/3",
    "sup_uid_1": 1,
    "sup_username_1": "sup1",
    "password_hash": "hashed_password",
    "password_coin": "coin_hash",
    "ip": "192.168.1.1",
    "third_login_type": "google",
    "status": 1,
    "last_login_ip": "192.168.1.1",
    "last_login_time": 1672530000,
    "created_at": 1672530000,
    "updated_at": 1672530000,
    "freeze_cause": "none",
    "freeze_at": 0,
    "operator_name": "admin",
    "fb_pid": "fb12345",
    "fb_cid": "fb67890",
    "created_name": "admin",
    "memberType": 1,
    "google_sub_id": "google123",
    "facebook_sub_id": "fb123",
    "secret": "secret_code",
    "code_url": "http://example.com",
    "code_status": 1,
    "user_type": 1
  }'

启动 Flink 集群

cd /usr/project/flink
docker-compose down -v
docker-compose up -d


执行 SQL CLI

docker exec -it flink-jobmanager /bin/bash

# 在容器内执行
cd /opt/flink
#启动win_user 任务
/opt/flink/bin/sql-client.sh embedded   -j /opt/flink/job/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar   -j /opt/flink/job/flink-connector-elasticsearch-base-3.0.1-1.17.jar   -j /opt/flink/job/flink-sql-connector-mysql-cdc-2.4.1.jar   -j /opt/flink/job/httpclient-4.5.13.jar   -j /opt/flink/job/httpcore-4.4.13.jar   -j /opt/flink/job/commons-codec-1.11.jar   -j /opt/flink/job/commons-logging-1.2.jar   -f /opt/flink/job/win_user.sql

  
#查看作业列表
./bin/flink list
#取消指定的作业
./bin/flink cancel <JobID>
./bin/flink cancel 7d3022995c94511d477c3be5d1794168

#任务详情
./bin/flink info 7d3022995c94511d477c3be5d1794168
#查看flink sql库表创建情况
SHOW TABLES;
#必须要发现里面有对应的表 才是成功的
Flink SQL> show tables;
+------------------------+
|             table name |
+------------------------+
| es_sink_table_win_user |
|    source_win_user |
+------------------------+
2 rows in set
#查看作业
show jobs;

es验证

如何快速确定手动插入的sql是否成功 我发现count 类增不会试试更新 kibana下面方法验证即可

GET win_user/_search
{
  "query": {
    "wildcard": {
      "username": {
        "value": "test001015"
      }
    }
  }
}

下面的查询可能有延迟 不建议使用

#启动后宿主机查看是否写入
#查看index列表
curl -u 'elastic:123456' -X GET "https://127.0.0.1.io:9243/_cat/indices?v"
#查看win_user index
curl -u 'elastic:123456' -X GET "https://127.0.0.1:9243/_cat/indices/win_user?v"

#查看win_index索引结构
curl -u 'elastic:123456' -X GET "https://127.0.0.1:9243/win_user/_mapping"

Es调优

检查副本分片的分配问题

curl -u 'elastic:123456' -X GET "https://127.0.0.1:9243/_cat/shards?v"

重新分配副本分片

curl -u 'elastic:123456' -X POST "https://127.0.0.1:9243/_cluster/reroute" -H 'Content-Type: application/json' -d '{
  "commands": [
    {
      "allocate_empty_primary": {
        "index": "win_user",
        "shard": 0,
        "node": "node_name",
        "accept_data_loss": true
      }
    }
  ]
}'

调整副本数配置

curl -u 'elastic:123456' -X PUT "https://127.0.0.1:9243/win_user/_settings" -H 'Content-Type: application/json' -d '{
  "settings": {
    "number_of_replicas": 0
  }
}'


账户

prd

http://127.0.0.1:8081
cooper
cooper

mysql配置检查

mysql -h 127.0.0.1 -u root -p
123456
#root账户配置main
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'main'@'%';
FLUSH PRIVILEGES;
#Flink CDC 所需的最小权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'main'@'%';
#验证
MySQL [(none)]> SHOW GRANTS FOR 'main'@'%';
+------------------------------------------------------------------------------+
| Grants for main@%                                                        |
+------------------------------------------------------------------------------+
| GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO `main`@`%` |
| GRANT `cloudsqlsuperuser`@`%` TO `main`@`%`   

SHOW VARIABLES LIKE 'log_bin';              -- 应为 ON
SHOW VARIABLES LIKE 'binlog_format';        -- 应为 ROW
SHOW VARIABLES LIKE 'binlog_row_image';     -- 应为 FULL

Elasticsearch数据验证

#真实生产连接地址
#账户
elastic
#密码
123456
#es url
127.0.0.1:9243
#测试命令
curl -u elastic:123456 -k https://127.0.0.1:9243

启动 Flink CDC

启动 Flink 集群

cd /usr/project/flink
#启动集群
docker-compose down -v && docker-compose up -d
#验证容器状态
docker ps -a | grep flink

# 在容器内执行 
cd /opt/flink
/opt/flink/bin/sql-client.sh embedded   -j /opt/flink/job/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar   -j /opt/flink/job/flink-connector-elasticsearch-base-3.0.1-1.17.jar   -j /opt/flink/job/flink-sql-connector-mysql-cdc-2.4.1.jar   -j /opt/flink/job/httpclient-4.5.13.jar   -j /opt/flink/job/httpcore-4.4.13.jar   -j /opt/flink/job/commons-codec-1.11.jar   -j /opt/flink/job/commons-logging-1.2.jar   -f /opt/flink/job/win_user.sql

#查看作业列表   
./bin/flink list
  
#查看作业列表
./bin/flink list
#取消指定的作业
./bin/flink cancel <JobID>
./bin/flink cancel 7d3022995c94511d477c3be5d1794168
  
#宿主机查询作业jobid
  curl http://localhost:8081/jobs/f54e8909ed2c1a19b49ed3788f6454fe

进入 Flink SQL CLI

#Flink jobmanager 容器
docker exec -it flink-jobmanager /bin/bash
#进入容器后,运行 Flink SQL 客户端来提交 SQL 作业
/opt/flink/bin/sql-client.sh
#提交 SQL 脚本任务
/opt/flink/bin/sql-client.sh --embedded --init-file /flink/sql/mysql_to_es.sql
#查看
/opt/flink/bin/flink list
#查看作业id
/opt/flink/bin/flink info <job_id>

#查看作业状态
http://127.0.0.1:8081/#/job/running

端口开放

8081

配置nignx安全登录

安装 htpasswd 工具

#安装nginx
sudo apt update
sudo apt install nginx
# 安装 htpasswd 工具
sudo apt-get install apache2-utils  
#创建 .htpasswd 文件
sudo htpasswd -c /etc/nginx/.htpasswd cooper
#设置密码
123456

配置 Nginx 反向代理

假设你的 Flink Web UI 正在本地的 8081 端口运行,我们需要配置 Nginx 来做反向代理,并且开启基本认证。

打开并编辑 Nginx 配置文件,通常是
/etc/nginx/sites-available/default 或 /etc/nginx/nginx.conf,根据你的系统配置。

vim /etc/nginx/sites-available/default

设置独立nginx.conf

vim /etc/nginx/conf.d/flink.conf

server {
    listen 18081;  # Nginx 监听 8081 端口(外部访问)
    server_name 127.0.0.1;  

    location / {
        proxy_pass http://localhost:8081;  # 将请求转发到 Flink Web UI 的实际端口
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # 启用基本认证
        auth_basic "Restricted Access";
        auth_basic_user_file /etc/nginx/.htpasswd;  # 指定密码文件
    }
}

重启

sudo systemctl reload nginx

账户密码

sudo htpasswd -c /etc/nginx/.htpasswd cooper
#设置密码
123456

http://127.0.0.1:18081/#/overview
cooper
123456

验证同步

查看 Flink UI

访问 http://<你的服务器IP>:8081 查看任务运行状态。

在 Elasticsearch 查询数据

curl -u root:'123456' "http://127.0.0.1:9400/win_user/_search?pretty"


#账户
elastic
#密码
123456
#es url
127.0.0.1:9400

如果配置过程中还遇到其他问题可以联系博主

相关推荐

服务器数据恢复—Raid5数据灾难不用愁,Raid5数据恢复原理了解下

Raid5数据恢复算法原理:分布式奇偶校验的独立磁盘结构(被称之为raid5)的数据恢复有一个“奇偶校验”的概念。可以简单的理解为二进制运算中的“异或运算”,通常使用的标识是xor。运算规则:若二者值...

服务器数据恢复—多次异常断电导致服务器raid不可用的数据恢复

服务器数据恢复环境&故障:由于机房多次断电导致一台服务器中raid阵列信息丢失。该阵列中存放的是文档,上层安装的是Windowsserver操作系统,没有配置ups。因为服务器异常断电重启后,rai...

服务器数据恢复-V7000存储更换磁盘数据同步失败的数据恢复案例

服务器数据恢复环境:P740+AIX+Sybase+V7000存储,存储阵列柜上共12块SAS机械硬盘(其中一块为热备盘)。服务器故障:存储阵列柜中有磁盘出现故障,工作人员发现后更换磁盘,新更换的磁盘...

「服务器数据恢复」重装系统导致XFS文件系统分区丢失的数据恢复

服务器数据恢复环境:DellPowerVault系列磁盘柜;用RAID卡创建的一组RAID5;分配一个LUN。服务器故障:在Linux系统层面对LUN进行分区,划分sdc1和sdc2两个分区。将sd...

服务器数据恢复-ESXi虚拟机被误删的数据恢复案例

服务器数据恢复环境:一台服务器安装的ESXi虚拟化系统,该虚拟化系统连接了多个LUN,其中一个LUN上运行了数台虚拟机,虚拟机安装WindowsServer操作系统。服务器故障&分析:管理员因误操作...

「服务器数据恢复」Raid5阵列两块硬盘亮黄灯掉线的数据恢复案例

服务器数据恢复环境:HPStorageWorks某型号存储;虚拟化平台为vmwareexsi;10块磁盘组成raid5(有1块热备盘)。服务器故障:raid5阵列中两块硬盘指示灯变黄掉线,无法读取...

服务器数据恢复—基于oracle数据库的SAP数据恢复案例

服务器存储数据恢复环境:某品牌服务器存储中有一组由6块SAS硬盘组建的RAID5阵列,其中有1块硬盘作为热备盘使用。上层划分若干lun,存放Oracle数据库数据。服务器存储故障&分析:该RAID5阵...

「服务器虚拟化数据恢复」Xen Server环境下数据库数据恢复案例

服务器虚拟化数据恢复环境:Dell某型号服务器;数块STAT硬盘通过raid卡组建的RAID10;XenServer服务器虚拟化系统;故障虚拟机操作系统:WindowsServer,部署Web服务...

服务器数据恢复—RAID故障导致oracle无法启动的数据恢复案例

服务器数据恢复环境:某品牌服务器中有一组由4块SAS磁盘做的RAID5磁盘阵列。该服务器操作系统为windowsserver,运行了一个单节点Oracle,数据存储为文件系统,无归档。该oracle...

服务器数据恢复—服务器磁盘阵列常见故障表现&amp;解决方案

RAID(磁盘阵列)是一种将多块物理硬盘整合成一个虚拟存储的技术,raid模块相当于一个存储管理的中间层,上层接收并执行操作系统及文件系统的数据读写指令,下层管理数据在各个物理硬盘上的存储及读写。相对...

「服务器数据恢复」IBM某型号服务器RAID5磁盘阵列数据恢复案例

服务器数据恢复环境:IBM某型号服务器;5块SAS硬盘组成RAID5磁盘阵列;存储划分为1个LUN和3个分区:第一个分区存放windowsserver系统,第二个分区存放SQLServer数据库,...

服务器数据恢复—Zfs文件系统下误删除文件如何恢复数据?

服务器故障:一台zfs文件系统服务器,管理员误操作删除服务器上的数据。服务器数据恢复过程:1、将故障服务器所有磁盘编号后取出,硬件工程师检测所有硬盘后没有发现有磁盘存在硬件故障。以只读方式将全部磁盘做...

服务器数据恢复—Linux+raid5服务器数据恢复案例

服务器数据恢复环境:某品牌linux操作系统服务器,服务器中有4块SAS接口硬盘组建一组raid5阵列。服务器中存放的数据有数据库、办公文档、代码文件等。服务器故障&检测:服务器在运行过程中突然瘫痪,...

服务器数据恢复—Sql Server数据库数据恢复案例

服务器数据恢复环境:一台安装windowsserver操作系统的服务器。一组由8块硬盘组建的RAID5,划分LUN供这台服务器使用。在windows服务器内装有SqlServer数据库。存储空间LU...

服务器数据恢复—阿里云ECS网站服务器数据恢复案例

云服务器数据恢复环境:阿里云ECS网站服务器,linux操作系统+mysql数据库。云服务器故障:在执行数据库版本更新测试时,在生产库误执行了本来应该在测试库执行的sql脚本,导致生产库部分表被tru...

取消回复欢迎 发表评论: