Remove naughty words from your data using DataCater
We all know we shouldn't use naughty words. Learn how to remove them from your streaming data using DataCater.
Censor your streaming data with Aiven and DataCater
Oh ****, everyone knows that bad words shouldn't be used!
Even though you might not expect it, profanity exists everywhere, whether it's a single person talking, a social media comment, or a live show. Giving visibility to such bad words can have disastrous effects, and, in the word of real-time streaming, we can't rely on a human check. Therefore we need to build streaming systems that automatically prevent bad words from being displayed. If you're interested in this, today's your lucky day, because we're going to create one with a couple of technologies and integrations!
In this blog post, we'll use the great Simpsons dataset, perfect for our scenario since it contains phrases with several bad words, like a live presenter cursing periodically. We'll demonstrate how we can clean up sentences in streaming mode using:
- Aiven for PostgreSQL®, everyone's beloved database, for storing the Simpsons dataset.
- ApiLayer Bad words API, for detecting bad words in the sentences.
- Aiven for OpenSearch®, for storing the polished Simpsons dataset; we’ll use search filters to investigate the results and the amazing OpenSearch Dashboard to visualize them.
- DataCater, an online service to define streaming data pipelines with an interesting option to write Python transformations; DataCater enables us to stream the data from PostgreSQL® to OpenSearch® and detect swears using ApiLayer’s Bad words API on the way.
Create the data storage
We need a PostgreSQL database that will act as storage for the sentences. For convenience, we are using Aiven for PostgreSQL; we can create it with the Aiven CLI and the following command:
avn service create demo-pg \ --service-type pg \ --plan hobbyist \ --cloud google-europe-west8
The minimal hobbyist
plan is all we need for the Simpsons dataset. We also chose the google-europe-west8
zone, one of the newest zones covered by Aiven, conveniently located in Milan. You can review the list of clouds available in the dedicated document.
We also need to create an Aiven for OpenSearch service which will contain the polished results:
avn service create demo-opensearch \ --service-type opensearch \ --plan hobbyist \ --cloud google-europe-west8
As before, we're using the hobbyist
plan and the same region. Let's wait for both services to be up with:
avn service wait demo-pg avn service wait demo-opensearch
Load the data in PostgreSQL
The Simpsons dialogue dataset is available in Kaggle. After creating a Kaggle account, we can download it as archive.zip
, and then decompress that to produce the data in CSV format in a file named simpsons_dataset.csv
. The file contains all the dialogues together with the character speaking each sentence.
To load it into the PostgreSQL database, we first need connect to the PostgreSQL command prompt with:
avn service cli demo-pg
Then we need to create a table called simpsons_dialogues
to host our dataset:
create table simpsons_dialogues( id serial primary key, character_speaking text, words text);
Copying the dataset over to PostgreSQL can be done using the PostgreSQL \copy
command:
\copy simpsons_dialogues(character_speaking, words) from 'simpsons_dataset.csv' csv header
The data should be ingested in a matter of seconds... Once the upload is finished, you can check there's some data in the table with the following query:
select count(*) from simpsons_dialogues;
Now it's time to play.
Define the transformation flow in DataCater
After importing the data, we need to define how we'll transform the data. We can head to DataCater and sign up for a free trial, entitling us to define up to two pipelines. Once our account is confirmed, we will land in the main dashboard where we can create the data endpoints of our pipeline.
Define the PostgreSQL source
We need to tell DataCater where to source the data and where to sink it. Let's start by mapping the source of data, by clicking on the Data Sources link at the top, and selecting Create a data source. The list of options available is quite wide, and includes PostgreSQL which we need to load the Simpsons dataset.
To define a datasource on top of Aiven for PostgreSQL we need to specify:
-
Name: the logical name of the datasource, we can use
simpsons_source_data
-
Hostname or IP, Port, Username and Password: the connection details to use to reach Aiven for PostgreSQL. We can find all the details with the following Aiven CLI command:
avn service get demo-pg --format '{service_uri_params}'
-
SSL: Aiven for PostgreSQL requires SSL, therefore we need to specify Use SSL
-
Database and Schema: the database and schema where the data is residing. We used the defaults, therefore the database is
defaultdb
and the schema ispublic
-
Table name: the table with the data, in our case it is
simpsons_dialogues
. We can either allow DataCater to fetch the list of tables and select from the dropdown, or fill it in explicitly.
We also need to change the Change Data Capture method, and select Logical replication with wal2json.
With all the fields filled, we can check the connection and, if successful, click on Create datasource.
Define the OpenSearch target
A similar exercise can be done with the OpenSearch index, that we'll use as a data pipeline target. We need to head to Data sinks and click on Create data sink, select Elasticsearch, and provide the following details:
-
Name: the logical name of the data sink, we can use
simpsons_sink_data
-
Hostname or IP, Port, Username and Password: the connection details to use to reach Aiven for PostgreSQL. We can find all the details with the following Aiven CLI command:
avn service get demo-opensearch --format '{service_uri_params}'
-
HTTP Scheme: we can use the
HTTPS
protocol, available by default in Aiven for openSearch -
Index name: we can either leave this blank (an index named
datacater_pipeline_[pipeline_id]
will be used), or fill it in. In our case we'll fill it in withsimpsons_cleaned_dialogues
Once we've checked that the connection is valid, we click on Create data sink to finalise the process.
Get the ApiLayer token and enable the Bad words API
As anticipated, we're going to use the ApiLayer Bad words API to clean-up the sentences, which means we need to setup an account. Once that's created and verified, we can subscribe to the ApiLayer Bad words API free plan (enough for our little test).
Remember to make a note of the API Key from your account details on the ApiLayer website. You'll need it down the road.
Polish all the sentences
Time to create our data pipeline.
Let's head back to DataCater and select the Pipelines tab at the top of the window and click on Create pipeline. We can now:
- Select the data source named
simpsons_source_data
pointing to our PostgreSQL table. - Select the data sink named
simpsons_cleaned_dialogues
pointing to our OpenSearch index.
Since ApiLayer Bad words API Free Plan only allows us to clean up to 100 sentences a day, we'll focus on the most recent sentences of the iconic character Bart Simpson
. To do this, let's navigate to the Filters tab and add the following filters:
- In the
character_speaking
column, add a filter of type Require values that equal value with the valueBart Simpson
- In the
id
column, add a filter of type Require values that are greater than value with the value157500
. The157500
value has been hand crafted accurately to have enough examples to play with, but not too much to exceed the API's free tier daily quota.
The Filters tab should look like the following image. Don't worry about the 100% drop rate, it's based on sample data, and we still have 54 rows to parse. You can check it with the following query in the PostgreSQL database
select count(*) from simpsons_dialogues where character_speaking = 'Bart Simpson' and id > 157500
Now we need to define the cleaning pipeline by navigating to the Transform tab, where the magic 🪄 happens.
If we click on the Apply transformation button below the character_speaking
column, we can check the long list of options available for transformations and select Capitalize to normalise all characters capitalization.
Then we can move to the words
column and, apply the User defined transformation which allows us to write Python, amazing! The DataCater Code Transformations documentation says that we can use the requests module to perform API calls to ApiLayer.
With Python we can call the ApiLayer Bad Words API, pass the phrase and store the cleanup response. Pasting the following code (taken from the ApiLayer documentation) and replacing the APILAYER_KEY
placeholder with the value we saved earlier from the ApiLayer website will do the trick:
import requests url = "https://api.apilayer.com/bad_words?censor_character={censor_character}" headers= { "apikey": "APILAYER_KEY" } def transform(value, row): result = '' if row['words'] is not None: payload = row['words'].encode("utf-8") response = requests.request("POST", url, headers=headers, data = payload) status_code = response.status_code result = response.text if result is None: result='' return str(result)
In the above code, we're setting the headers and the URL, and then retrieving the words
column from the current row (row['words]
), encoding it and passing it to the ApiLayer API, and then finally parsing the result.
Once we've finished our transformation definition, the tab should be similar to the following image.
Deploy the pipeline and check the results
It's finally time to deploy the pipeline. We can head to the Deploy tab and click on Create deployment. We just need to wait a couple of seconds for the deployment to be created and then we can hit the Start button. The DataCater UI allows us also to browse the deployment logs, which is quite handy to spot if something goes wrong.
Where's our data gone? We can check it in OpenSearch Dashboards, available alongside our Aiven for OpenSearch. We can find the URL and login credentials with:
avn service get demo-opensearch --json \ | jq -r '.connection_info.opensearch_dashboards_uri'
We can head to Stack Management select Index Patterns, and create an index pattern with the name simpsons*
. If our data pipeline is working we should see that the simpsons_cleaned_dialogues
exists.
After creating the index pattern, we can head to the Discover tab, add a filter for words.bad_words_total
not being equal to 0, and review the dialogues that seem to contain bad words. Some of them are false positives... but hey better safe than sorry!
Is it really streaming?
As of now, we have only polished a static set of sentences. Can we demonstrate that the flow is acting in streaming mode too?
With our terminal connected to the PostgreSQL instance, we can now take a deep breath and run the following insert statement replacing the BAD_WORDS_PHRASE
with something really impolite:
insert into simpsons_dialogues (character_speaking, words) values ('Bart Simpson', 'BAD_WORDS_PHRASE');
And immediately we should see a new entry in the OpenSearch index. In my case... it worked!
By refreshing the OpenSearch Dashboard, scrolling to the bottom, checking the last entry and clicking on the little arrow >
next to the message, we can see the JSON output in detail, represented below with some *
polishing the bad words.
{ "_index": "thesimpsons", "_type": "_doc", "_id": "158315", "_version": 0, "_score": 0, "_source": { "id": 158315, "character_speaking": "Bart Simpson", "words": { "content": "D*** A** S***", "bad_words_total": 3, "bad_words_list": [ { "original": "D***", "word": "d***", "deviations": 0, "info": 2, "start": 0, "end": 4, "replacedLen": 4 }, { "original": "A**", "word": "a**", "deviations": 0, "info": 2, "start": 5, "end": 8, "replacedLen": 3 }, { "original": "S**", "word": "s**", "deviations": 0, "info": 2, "start": 9, "end": 13, "replacedLen": 4 } ], "censored_content": "**** *** ****" } } }
A real-time streaming solution that polishes sentences... This is **** brilliant!
Enhance streaming data pipelines with Python extensions
Jokes apart, this is just a simple example of what's achievable by plugging DataCater on top of Aiven services, like Aiven for PostgreSQL and Aiven for OpenSearch. The rich set of pre-cooked transformations allows you to cover a great part of the typical data manipulations needed, and, for the rest, there's always the Python extension handy.
Some more resources that you might find useful: