mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 11:29:46 +00:00
Merge remote-tracking branch 'origin/main' into use-node-postgres
This commit is contained in:
commit
f7e9cd235c
6 changed files with 184 additions and 129 deletions
|
|
@ -13,7 +13,8 @@
|
||||||
"admin:role": "deno run -A scripts/admin-role.ts",
|
"admin:role": "deno run -A scripts/admin-role.ts",
|
||||||
"setup": "deno run -A scripts/setup.ts",
|
"setup": "deno run -A scripts/setup.ts",
|
||||||
"stats:recompute": "deno run -A scripts/stats-recompute.ts",
|
"stats:recompute": "deno run -A scripts/stats-recompute.ts",
|
||||||
"soapbox": "curl -O https://dl.soapbox.pub/main/soapbox.zip && mkdir -p public && mv soapbox.zip public/ && cd public/ && unzip soapbox.zip && rm soapbox.zip"
|
"soapbox": "curl -O https://dl.soapbox.pub/main/soapbox.zip && mkdir -p public && mv soapbox.zip public/ && cd public/ && unzip soapbox.zip && rm soapbox.zip",
|
||||||
|
"trends": "deno run -A scripts/trends.ts"
|
||||||
},
|
},
|
||||||
"unstable": ["cron", "ffi", "kv", "worker-options"],
|
"unstable": ["cron", "ffi", "kv", "worker-options"],
|
||||||
"exclude": ["./public"],
|
"exclude": ["./public"],
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
import { NSchema } from '@nostrify/nostrify';
|
import { NSchema } from '@nostrify/nostrify';
|
||||||
|
import { nip19 } from 'nostr-tools';
|
||||||
|
|
||||||
import { DittoDB } from '@/db/DittoDB.ts';
|
import { DittoDB } from '@/db/DittoDB.ts';
|
||||||
import { AdminSigner } from '@/signers/AdminSigner.ts';
|
import { AdminSigner } from '@/signers/AdminSigner.ts';
|
||||||
|
|
@ -8,7 +9,8 @@ import { nostrNow } from '@/utils.ts';
|
||||||
const kysely = await DittoDB.getInstance();
|
const kysely = await DittoDB.getInstance();
|
||||||
const eventsDB = new EventsDB(kysely);
|
const eventsDB = new EventsDB(kysely);
|
||||||
|
|
||||||
const [pubkey, role] = Deno.args;
|
const [pubkeyOrNpub, role] = Deno.args;
|
||||||
|
const pubkey = pubkeyOrNpub.startsWith('npub1') ? nip19.decode(pubkeyOrNpub as `npub1${string}`).data : pubkeyOrNpub;
|
||||||
|
|
||||||
if (!NSchema.id().safeParse(pubkey).success) {
|
if (!NSchema.id().safeParse(pubkey).success) {
|
||||||
console.error('Invalid pubkey');
|
console.error('Invalid pubkey');
|
||||||
|
|
|
||||||
44
scripts/trends.ts
Normal file
44
scripts/trends.ts
Normal file
|
|
@ -0,0 +1,44 @@
|
||||||
|
import { z } from 'zod';
|
||||||
|
|
||||||
|
import {
|
||||||
|
updateTrendingEvents,
|
||||||
|
updateTrendingHashtags,
|
||||||
|
updateTrendingLinks,
|
||||||
|
updateTrendingPubkeys,
|
||||||
|
updateTrendingZappedEvents,
|
||||||
|
} from '@/trends.ts';
|
||||||
|
|
||||||
|
const trendSchema = z.enum(['pubkeys', 'zapped_events', 'events', 'hashtags', 'links']);
|
||||||
|
const trends = trendSchema.array().parse(Deno.args);
|
||||||
|
|
||||||
|
if (!trends.length) {
|
||||||
|
trends.push('pubkeys', 'zapped_events', 'events', 'hashtags', 'links');
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const trend of trends) {
|
||||||
|
switch (trend) {
|
||||||
|
case 'pubkeys':
|
||||||
|
console.log('Updating trending pubkeys...');
|
||||||
|
await updateTrendingPubkeys();
|
||||||
|
break;
|
||||||
|
case 'zapped_events':
|
||||||
|
console.log('Updating trending zapped events...');
|
||||||
|
await updateTrendingZappedEvents();
|
||||||
|
break;
|
||||||
|
case 'events':
|
||||||
|
console.log('Updating trending events...');
|
||||||
|
await updateTrendingEvents();
|
||||||
|
break;
|
||||||
|
case 'hashtags':
|
||||||
|
console.log('Updating trending hashtags...');
|
||||||
|
await updateTrendingHashtags();
|
||||||
|
break;
|
||||||
|
case 'links':
|
||||||
|
console.log('Updating trending links...');
|
||||||
|
await updateTrendingLinks();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log('Trends updated.');
|
||||||
|
Deno.exit(0);
|
||||||
88
src/cron.ts
88
src/cron.ts
|
|
@ -1,84 +1,12 @@
|
||||||
import { Stickynotes } from '@soapbox/stickynotes';
|
import { updateTrendingLinks } from '@/trends.ts';
|
||||||
|
import { updateTrendingHashtags } from '@/trends.ts';
|
||||||
import { Conf } from '@/config.ts';
|
import { updateTrendingEvents, updateTrendingPubkeys, updateTrendingZappedEvents } from '@/trends.ts';
|
||||||
import { DittoDB } from '@/db/DittoDB.ts';
|
|
||||||
import { handleEvent } from '@/pipeline.ts';
|
|
||||||
import { AdminSigner } from '@/signers/AdminSigner.ts';
|
|
||||||
import { getTrendingTagValues } from '@/trends/trending-tag-values.ts';
|
|
||||||
import { Time } from '@/utils/time.ts';
|
|
||||||
|
|
||||||
const console = new Stickynotes('ditto:trends');
|
|
||||||
|
|
||||||
async function updateTrendingTags(
|
|
||||||
l: string,
|
|
||||||
tagName: string,
|
|
||||||
kinds: number[],
|
|
||||||
limit: number,
|
|
||||||
extra = '',
|
|
||||||
aliases?: string[],
|
|
||||||
) {
|
|
||||||
console.info(`Updating trending ${l}...`);
|
|
||||||
const kysely = await DittoDB.getInstance();
|
|
||||||
const signal = AbortSignal.timeout(1000);
|
|
||||||
|
|
||||||
const yesterday = Math.floor((Date.now() - Time.days(1)) / 1000);
|
|
||||||
const now = Math.floor(Date.now() / 1000);
|
|
||||||
|
|
||||||
const tagNames = aliases ? [tagName, ...aliases] : [tagName];
|
|
||||||
|
|
||||||
const trends = await getTrendingTagValues(kysely, tagNames, {
|
|
||||||
kinds,
|
|
||||||
since: yesterday,
|
|
||||||
until: now,
|
|
||||||
limit,
|
|
||||||
});
|
|
||||||
|
|
||||||
if (!trends.length) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const signer = new AdminSigner();
|
|
||||||
|
|
||||||
const label = await signer.signEvent({
|
|
||||||
kind: 1985,
|
|
||||||
content: '',
|
|
||||||
tags: [
|
|
||||||
['L', 'pub.ditto.trends'],
|
|
||||||
['l', l, 'pub.ditto.trends'],
|
|
||||||
...trends.map(({ value, authors, uses }) => [tagName, value, extra, authors.toString(), uses.toString()]),
|
|
||||||
],
|
|
||||||
created_at: Math.floor(Date.now() / 1000),
|
|
||||||
});
|
|
||||||
|
|
||||||
await handleEvent(label, signal);
|
|
||||||
console.info(`Trending ${l} updated.`);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Start cron jobs for the application. */
|
/** Start cron jobs for the application. */
|
||||||
export function cron() {
|
export function cron() {
|
||||||
Deno.cron(
|
Deno.cron('update trending pubkeys', '0 * * * *', updateTrendingPubkeys);
|
||||||
'update trending pubkeys',
|
Deno.cron('update trending zapped events', '7 * * * *', updateTrendingZappedEvents);
|
||||||
'0 * * * *',
|
Deno.cron('update trending events', '15 * * * *', updateTrendingEvents);
|
||||||
() => updateTrendingTags('#p', 'p', [1, 3, 6, 7, 9735], 40, Conf.relay),
|
Deno.cron('update trending hashtags', '30 * * * *', updateTrendingHashtags);
|
||||||
);
|
Deno.cron('update trending links', '45 * * * *', updateTrendingLinks);
|
||||||
Deno.cron(
|
|
||||||
'update trending zapped events',
|
|
||||||
'7 * * * *',
|
|
||||||
() => updateTrendingTags('zapped', 'e', [9735], 40, Conf.relay, ['q']),
|
|
||||||
);
|
|
||||||
Deno.cron(
|
|
||||||
'update trending events',
|
|
||||||
'15 * * * *',
|
|
||||||
() => updateTrendingTags('#e', 'e', [1, 6, 7, 9735], 40, Conf.relay, ['q']),
|
|
||||||
);
|
|
||||||
Deno.cron(
|
|
||||||
'update trending hashtags',
|
|
||||||
'30 * * * *',
|
|
||||||
() => updateTrendingTags('#t', 't', [1], 20),
|
|
||||||
);
|
|
||||||
Deno.cron(
|
|
||||||
'update trending links',
|
|
||||||
'45 * * * *',
|
|
||||||
() => updateTrendingTags('#r', 'r', [1], 20),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
127
src/trends.ts
Normal file
127
src/trends.ts
Normal file
|
|
@ -0,0 +1,127 @@
|
||||||
|
import { NostrFilter } from '@nostrify/nostrify';
|
||||||
|
import { Stickynotes } from '@soapbox/stickynotes';
|
||||||
|
import { Kysely } from 'kysely';
|
||||||
|
|
||||||
|
import { Conf } from '@/config.ts';
|
||||||
|
import { DittoDB } from '@/db/DittoDB.ts';
|
||||||
|
import { DittoTables } from '@/db/DittoTables.ts';
|
||||||
|
import { handleEvent } from '@/pipeline.ts';
|
||||||
|
import { AdminSigner } from '@/signers/AdminSigner.ts';
|
||||||
|
import { Time } from '@/utils/time.ts';
|
||||||
|
|
||||||
|
const console = new Stickynotes('ditto:trends');
|
||||||
|
|
||||||
|
/** Get trending tag values for a given tag in the given time frame. */
|
||||||
|
export async function getTrendingTagValues(
|
||||||
|
/** Kysely instance to execute queries on. */
|
||||||
|
kysely: Kysely<DittoTables>,
|
||||||
|
/** Tag name to filter by, eg `t` or `r`. */
|
||||||
|
tagNames: string[],
|
||||||
|
/** Filter of eligible events. */
|
||||||
|
filter: NostrFilter,
|
||||||
|
): Promise<{ value: string; authors: number; uses: number }[]> {
|
||||||
|
let query = kysely
|
||||||
|
.selectFrom('nostr_tags')
|
||||||
|
.innerJoin('nostr_events', 'nostr_events.id', 'nostr_tags.event_id')
|
||||||
|
.select(({ fn }) => [
|
||||||
|
'nostr_tags.value',
|
||||||
|
fn.agg<number>('count', ['nostr_events.pubkey']).distinct().as('authors'),
|
||||||
|
fn.countAll<number>().as('uses'),
|
||||||
|
])
|
||||||
|
.where('nostr_tags.name', 'in', tagNames)
|
||||||
|
.groupBy('nostr_tags.value')
|
||||||
|
.orderBy((c) => c.fn.agg('count', ['nostr_events.pubkey']).distinct(), 'desc');
|
||||||
|
|
||||||
|
if (filter.kinds) {
|
||||||
|
query = query.where('nostr_events.kind', 'in', filter.kinds);
|
||||||
|
}
|
||||||
|
if (typeof filter.since === 'number') {
|
||||||
|
query = query.where('nostr_events.created_at', '>=', filter.since);
|
||||||
|
}
|
||||||
|
if (typeof filter.until === 'number') {
|
||||||
|
query = query.where('nostr_events.created_at', '<=', filter.until);
|
||||||
|
}
|
||||||
|
if (typeof filter.limit === 'number') {
|
||||||
|
query = query.limit(filter.limit);
|
||||||
|
}
|
||||||
|
|
||||||
|
const rows = await query.execute();
|
||||||
|
|
||||||
|
return rows.map((row) => ({
|
||||||
|
value: row.value,
|
||||||
|
authors: Number(row.authors),
|
||||||
|
uses: Number(row.uses),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Get trending tags and publish an event with them. */
|
||||||
|
export async function updateTrendingTags(
|
||||||
|
l: string,
|
||||||
|
tagName: string,
|
||||||
|
kinds: number[],
|
||||||
|
limit: number,
|
||||||
|
extra = '',
|
||||||
|
aliases?: string[],
|
||||||
|
) {
|
||||||
|
console.info(`Updating trending ${l}...`);
|
||||||
|
const kysely = await DittoDB.getInstance();
|
||||||
|
const signal = AbortSignal.timeout(1000);
|
||||||
|
|
||||||
|
const yesterday = Math.floor((Date.now() - Time.days(1)) / 1000);
|
||||||
|
const now = Math.floor(Date.now() / 1000);
|
||||||
|
|
||||||
|
const tagNames = aliases ? [tagName, ...aliases] : [tagName];
|
||||||
|
|
||||||
|
const trends = await getTrendingTagValues(kysely, tagNames, {
|
||||||
|
kinds,
|
||||||
|
since: yesterday,
|
||||||
|
until: now,
|
||||||
|
limit,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!trends.length) {
|
||||||
|
console.info(`No trending ${l} found. Skipping.`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const signer = new AdminSigner();
|
||||||
|
|
||||||
|
const label = await signer.signEvent({
|
||||||
|
kind: 1985,
|
||||||
|
content: '',
|
||||||
|
tags: [
|
||||||
|
['L', 'pub.ditto.trends'],
|
||||||
|
['l', l, 'pub.ditto.trends'],
|
||||||
|
...trends.map(({ value, authors, uses }) => [tagName, value, extra, authors.toString(), uses.toString()]),
|
||||||
|
],
|
||||||
|
created_at: Math.floor(Date.now() / 1000),
|
||||||
|
});
|
||||||
|
|
||||||
|
await handleEvent(label, signal);
|
||||||
|
console.info(`Trending ${l} updated.`);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Update trending pubkeys. */
|
||||||
|
export function updateTrendingPubkeys(): Promise<void> {
|
||||||
|
return updateTrendingTags('#p', 'p', [1, 3, 6, 7, 9735], 40, Conf.relay);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Update trending zapped events. */
|
||||||
|
export function updateTrendingZappedEvents(): Promise<void> {
|
||||||
|
return updateTrendingTags('zapped', 'e', [9735], 40, Conf.relay, ['q']);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Update trending events. */
|
||||||
|
export function updateTrendingEvents(): Promise<void> {
|
||||||
|
return updateTrendingTags('#e', 'e', [1, 6, 7, 9735], 40, Conf.relay, ['q']);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Update trending hashtags. */
|
||||||
|
export function updateTrendingHashtags(): Promise<void> {
|
||||||
|
return updateTrendingTags('#t', 't', [1], 20);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Update trending links. */
|
||||||
|
export function updateTrendingLinks(): Promise<void> {
|
||||||
|
return updateTrendingTags('#r', 'r', [1], 20);
|
||||||
|
}
|
||||||
|
|
@ -1,47 +0,0 @@
|
||||||
import { NostrFilter } from '@nostrify/nostrify';
|
|
||||||
import { Kysely } from 'kysely';
|
|
||||||
|
|
||||||
import { DittoTables } from '@/db/DittoTables.ts';
|
|
||||||
|
|
||||||
/** Get trending tag values for a given tag in the given time frame. */
|
|
||||||
export async function getTrendingTagValues(
|
|
||||||
/** Kysely instance to execute queries on. */
|
|
||||||
kysely: Kysely<DittoTables>,
|
|
||||||
/** Tag name to filter by, eg `t` or `r`. */
|
|
||||||
tagNames: string[],
|
|
||||||
/** Filter of eligible events. */
|
|
||||||
filter: NostrFilter,
|
|
||||||
): Promise<{ value: string; authors: number; uses: number }[]> {
|
|
||||||
let query = kysely
|
|
||||||
.selectFrom('nostr_tags')
|
|
||||||
.innerJoin('nostr_events', 'nostr_events.id', 'nostr_tags.event_id')
|
|
||||||
.select(({ fn }) => [
|
|
||||||
'nostr_tags.value',
|
|
||||||
fn.agg<number>('count', ['nostr_events.pubkey']).distinct().as('authors'),
|
|
||||||
fn.countAll<number>().as('uses'),
|
|
||||||
])
|
|
||||||
.where('nostr_tags.name', 'in', tagNames)
|
|
||||||
.groupBy('nostr_tags.value')
|
|
||||||
.orderBy((c) => c.fn.agg('count', ['nostr_events.pubkey']).distinct(), 'desc');
|
|
||||||
|
|
||||||
if (filter.kinds) {
|
|
||||||
query = query.where('nostr_events.kind', 'in', filter.kinds);
|
|
||||||
}
|
|
||||||
if (typeof filter.since === 'number') {
|
|
||||||
query = query.where('nostr_events.created_at', '>=', filter.since);
|
|
||||||
}
|
|
||||||
if (typeof filter.until === 'number') {
|
|
||||||
query = query.where('nostr_events.created_at', '<=', filter.until);
|
|
||||||
}
|
|
||||||
if (typeof filter.limit === 'number') {
|
|
||||||
query = query.limit(filter.limit);
|
|
||||||
}
|
|
||||||
|
|
||||||
const rows = await query.execute();
|
|
||||||
|
|
||||||
return rows.map((row) => ({
|
|
||||||
value: row.value,
|
|
||||||
authors: Number(row.authors),
|
|
||||||
uses: Number(row.uses),
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
Loading…
Add table
Reference in a new issue